summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/test/java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/test/java')
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java196
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java34
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java174
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java4
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java4
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java2
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java6
7 files changed, 69 insertions, 351 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java
deleted file mode 100644
index 24144686..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import java.util.LinkedList;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.pooling.message.Forward;
-
-public class EventQueueTest {
-
- private static final int MAX_SIZE = 5;
- private static final long MAX_AGE_MS = 3000L;
-
- private static final String MY_SOURCE = "my.source";
- private static final CommInfrastructure MY_PROTO = CommInfrastructure.UEB;
- private static final String MY_TOPIC = "my.topic";
- private static final String MY_PAYLOAD = "my.payload";
- private static final String MY_REQID = "my.request.id";
-
- private EventQueue queue;
-
- @Before
- public void setUp() {
- queue = new EventQueue(MAX_SIZE, MAX_AGE_MS);
-
- }
-
- @Test
- public void testEventQueue() {
- // shouldn't generate an exception
- new EventQueue(1, 1);
- }
-
- @Test
- public void testClear() {
- // add some items
- queue.add(makeActive());
- queue.add(makeActive());
-
- assertFalse(queue.isEmpty());
-
- queue.clear();
-
- // should be empty now
- assertTrue(queue.isEmpty());
- }
-
- @Test
- public void testIsEmpty() {
- // test when empty
- assertTrue(queue.isEmpty());
-
- // all active
- Forward msg1 = makeActive();
- Forward msg2 = makeActive();
- queue.add(msg1);
- assertFalse(queue.isEmpty());
-
- queue.add(msg2);
- assertFalse(queue.isEmpty());
-
- assertEquals(msg1, queue.poll());
- assertFalse(queue.isEmpty());
-
- assertEquals(msg2, queue.poll());
- assertTrue(queue.isEmpty());
-
- // active, expired, expired, active
- queue.add(msg1);
- queue.add(makeInactive());
- queue.add(makeInactive());
- queue.add(msg2);
-
- assertEquals(msg1, queue.poll());
- assertFalse(queue.isEmpty());
-
- assertEquals(msg2, queue.poll());
- assertTrue(queue.isEmpty());
- }
-
- @Test
- public void testSize() {
- queue = new EventQueue(2, 1000L);
- assertEquals(0, queue.size());
-
- queue.add(makeActive());
- assertEquals(1, queue.size());
-
- queue.poll();
- assertEquals(0, queue.size());
-
- queue.add(makeActive());
- queue.add(makeActive());
- assertEquals(2, queue.size());
-
- queue.poll();
- assertEquals(1, queue.size());
-
- queue.poll();
- assertEquals(0, queue.size());
- }
-
- @Test
- public void testAdd() {
- int nextra = 3;
-
- // create excess messages
- LinkedList<Forward> msgs = new LinkedList<>();
- for (int x = 0; x < MAX_SIZE + nextra; ++x) {
- msgs.add(makeActive());
- }
-
- // add them to the queue
- msgs.forEach(msg -> queue.add(msg));
-
- // should not have added too many messages
- assertEquals(MAX_SIZE, queue.size());
-
- // should have discarded the first "nextra" items
- for (int x = 0; x < MAX_SIZE; ++x) {
- assertEquals("x=" + x, msgs.get(x + nextra), queue.poll());
- }
-
- assertEquals(null, queue.poll());
- }
-
- @Test
- public void testPoll() {
- // poll when empty
- assertNull(queue.poll());
-
- // all active
- Forward msg1 = makeActive();
- Forward msg2 = makeActive();
- queue.add(msg1);
- queue.add(msg2);
-
- assertEquals(msg1, queue.poll());
- assertEquals(msg2, queue.poll());
- assertEquals(null, queue.poll());
-
- // active, expired, expired, active
- queue.add(msg1);
- queue.add(makeInactive());
- queue.add(makeInactive());
- queue.add(msg2);
-
- assertEquals(msg1, queue.poll());
- assertEquals(msg2, queue.poll());
- assertEquals(null, queue.poll());
-
- // one that's close to the age limit
- msg1 = makeActive();
- msg1.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS + 100);
- queue.add(msg1);
- assertEquals(msg1, queue.poll());
- assertEquals(null, queue.poll());
- }
-
- private Forward makeActive() {
- return new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID);
- }
-
- private Forward makeInactive() {
- Forward msg = new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID);
-
- msg.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS - 100);
-
- return msg;
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
index 298af064..6280ebed 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
@@ -28,9 +28,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
@@ -121,6 +123,7 @@ public class FeatureTest2 {
// these are saved and restored on exit from this test class
private static PoolingFeature.Factory saveFeatureFactory;
private static PoolingManagerImpl.Factory saveManagerFactory;
+ private static DmaapManager.Factory saveDmaapFactory;
/**
* Sink for external DMaaP topic.
@@ -128,6 +131,11 @@ public class FeatureTest2 {
private static TopicSink externalSink;
/**
+ * Sink for internal DMaaP topic.
+ */
+ private static TopicSink internalSink;
+
+ /**
* Context for the current test case.
*/
private Context ctx;
@@ -137,18 +145,23 @@ public class FeatureTest2 {
public static void setUpBeforeClass() {
saveFeatureFactory = PoolingFeature.getFactory();
saveManagerFactory = PoolingManagerImpl.getFactory();
+ saveDmaapFactory = DmaapManager.getFactory();
- Properties props = makeSinkProperties(EXTERNAL_TOPIC);
- externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0);
+ externalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
externalSink.start();
+
+ internalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
+ internalSink.start();
}
@AfterClass
public static void tearDownAfterClass() {
PoolingFeature.setFactory(saveFeatureFactory);
PoolingManagerImpl.setFactory(saveManagerFactory);
+ DmaapManager.setFactory(saveDmaapFactory);
externalSink.stop();
+ internalSink.stop();
}
@Before
@@ -443,6 +456,7 @@ public class FeatureTest2 {
private final AtomicBoolean sawMsg = new AtomicBoolean(false);
private final TopicSource externalSource;
+ private final TopicSource internalSource;
// mock objects
private final PolicyEngine engine = mock(PolicyEngine.class);
@@ -458,8 +472,8 @@ public class FeatureTest2 {
when(controller.getName()).thenReturn(CONTROLLER1);
when(controller.getDrools()).thenReturn(drools);
- Properties props = makeSourceProperties(EXTERNAL_TOPIC);
- externalSource = TopicEndpoint.manager.addTopicSources(props).get(0);
+ externalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
+ internalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
// stop consuming events if the controller stops
when(controller.stop()).thenAnswer(args -> {
@@ -489,6 +503,18 @@ public class FeatureTest2 {
* "DMaaP" topic and its own internal "DMaaP" topic.
*/
public void start() {
+ DmaapManager.setFactory(new DmaapManager.Factory() {
+ @Override
+ public List<TopicSource> getTopicSources() {
+ return Arrays.asList(internalSource, externalSource);
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks() {
+ return Arrays.asList(internalSink, externalSink);
+ }
+ });
+
feature.beforeStart(engine);
feature.afterCreate(controller);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index 64573ab0..d74b87f9 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -23,7 +23,6 @@ package org.onap.policy.drools.pooling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -106,7 +105,6 @@ public class PoolingManagerImplTest {
private Properties plainProps;
private PoolingProperties poolProps;
private ListeningController controller;
- private EventQueue eventQueue;
private ClassExtractors extractors;
private DmaapManager dmaap;
private ScheduledThreadPoolExecutor sched;
@@ -146,14 +144,12 @@ public class PoolingManagerImplTest {
active = new CountDownLatch(1);
factory = mock(Factory.class);
- eventQueue = mock(EventQueue.class);
extractors = mock(ClassExtractors.class);
dmaap = mock(DmaapManager.class);
controller = mock(ListeningController.class);
sched = mock(ScheduledThreadPoolExecutor.class);
drools = mock(DroolsController.class);
- when(factory.makeEventQueue(any())).thenReturn(eventQueue);
when(factory.makeClassExtractors(any())).thenReturn(extractors);
when(factory.makeDmaapManager(any())).thenReturn(dmaap);
when(factory.makeScheduler()).thenReturn(sched);
@@ -304,24 +300,25 @@ public class PoolingManagerImplTest {
@Test
public void testBeforeStop() throws Exception {
startMgr();
- mgr.startDistributing(makeAssignments(true));
+ mgr.startDistributing(makeAssignments(false));
- // verify that this message is not queued
Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
mgr.handle(msg);
- verify(eventQueue, never()).add(msg);
+ verify(dmaap, times(START_PUB+1)).publish(any());
mgr.beforeStop();
verify(dmaap).stopConsumer(mgr);
verify(sched).shutdownNow();
+ verify(dmaap, times(START_PUB+2)).publish(any());
verify(dmaap).publish(contains("offline"));
assertTrue(mgr.getCurrent() instanceof IdleState);
- // verify that next message is queued
- mgr.handle(msg);
- verify(eventQueue).add(msg);
+ // verify that next message is handled locally
+ mgr.handle(msg);
+ verify(dmaap, times(START_PUB+2)).publish(any());
+ verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
}
@Test
@@ -360,26 +357,8 @@ public class PoolingManagerImplTest {
startMgr();
mgr.beforeStop();
- when(eventQueue.isEmpty()).thenReturn(false);
- when(eventQueue.size()).thenReturn(3);
-
- mgr.afterStop();
-
- verify(eventQueue).clear();
- verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
- }
-
- @Test
- public void testAfterStop_EmptyQueue() throws Exception {
- startMgr();
- mgr.beforeStop();
-
- when(eventQueue.isEmpty()).thenReturn(true);
- when(eventQueue.size()).thenReturn(0);
-
mgr.afterStop();
- verify(eventQueue, never()).clear();
verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
}
@@ -499,18 +478,6 @@ public class PoolingManagerImplTest {
}
@Test
- public void testInternalTopicFailed() throws Exception {
- startMgr();
-
- CountDownLatch latch = mgr.internalTopicFailed();
-
- // wait for the thread to complete
- assertTrue(latch.await(2, TimeUnit.SECONDS));
-
- verify(controller).stop();
- }
-
- @Test
public void testSchedule() throws Exception {
// must start the scheduler
startMgr();
@@ -693,14 +660,7 @@ public class PoolingManagerImplTest {
public void testBeforeInsert_NoIntercept() throws Exception {
startMgr();
- long tbegin = System.currentTimeMillis();
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
- verify(eventQueue).add(msgCap.capture());
-
- validateMessageContent(tbegin, msgCap.getValue());
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
}
@Test
@@ -726,37 +686,20 @@ public class PoolingManagerImplTest {
startMgr();
assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- // should not have tried to enqueue a message
- verify(eventQueue, never()).add(any());
}
@Test
public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
startMgr();
- long tbegin = System.currentTimeMillis();
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
- verify(eventQueue).add(msgCap.capture());
-
- validateMessageContent(tbegin, msgCap.getValue());
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
}
@Test
public void testHandleExternalForward_NoAssignments() throws Exception {
startMgr();
- long tbegin = System.currentTimeMillis();
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
- verify(eventQueue).add(msgCap.capture());
-
- validateMessageContent(tbegin, msgCap.getValue());
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
}
@Test
@@ -921,25 +864,26 @@ public class PoolingManagerImplTest {
@Test
public void testMakeForward() throws Exception {
startMgr();
-
- long tbegin = System.currentTimeMillis();
+
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
- verify(eventQueue).add(msgCap.capture());
-
- validateMessageContent(tbegin, msgCap.getValue());
+
+ verify(dmaap, times(START_PUB+1)).publish(any());
}
@Test
public void testMakeForward_InvalidMsg() throws Exception {
startMgr();
+
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
- // should not have tried to enqueue a message
- verify(eventQueue, never()).add(any());
+ // should not have tried to publish a message
+ verify(dmaap, times(START_PUB)).publish(any());
}
@Test
@@ -1064,69 +1008,27 @@ public class PoolingManagerImplTest {
startMgr();
// route the message to this host
- assertNotNull(mgr.startDistributing(makeAssignments(true)));
+ mgr.startDistributing(makeAssignments(true));
assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(eventQueue, never()).add(any());
+ verify(dmaap, times(START_PUB)).publish(any());
- // null assignments should cause message to be queued
- assertNull(mgr.startDistributing(null));
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(eventQueue).add(any());
+ // null assignments should cause message to be processed locally
+ mgr.startDistributing(null);
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ verify(dmaap, times(START_PUB)).publish(any());
// route the message to this host
- assertNotNull(mgr.startDistributing(makeAssignments(true)));
+ mgr.startDistributing(makeAssignments(true));
assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(eventQueue).add(any());
+ verify(dmaap, times(START_PUB)).publish(any());
// route the message to the other host
- assertNotNull(mgr.startDistributing(makeAssignments(false)));
+ mgr.startDistributing(makeAssignments(false));
assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(eventQueue).add(any());
- }
-
- @Test
- public void testStartDistributing_EventsInQueue_ProcessLocally() throws Exception {
- startMgr();
-
- // put items in the queue
- LinkedList<Forward> lst = new LinkedList<>();
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-
- when(eventQueue.poll()).thenAnswer(args -> lst.poll());
-
- // route the messages to this host
- CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
- assertTrue(latch.await(2, TimeUnit.SECONDS));
-
- // all of the events should have been processed locally
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller, times(3)).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testStartDistributing_EventsInQueue_Forward() throws Exception {
- startMgr();
-
- // put items in the queue
- LinkedList<Forward> lst = new LinkedList<>();
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-
- when(eventQueue.poll()).thenAnswer(args -> lst.poll());
-
- // route the messages to the OTHER host
- CountDownLatch latch = mgr.startDistributing(makeAssignments(false));
- assertTrue(latch.await(2, TimeUnit.SECONDS));
-
- // all of the events should have been forwarded
- verify(dmaap, times(4)).publish(any());
- verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ verify(dmaap, times(START_PUB+1)).publish(any());
}
@Test
@@ -1228,22 +1130,6 @@ public class PoolingManagerImplTest {
}
/**
- * Validates the message content.
- *
- * @param tbegin creation time stamp must be no less than this
- * @param msg message to be validated
- */
- private void validateMessageContent(long tbegin, Forward msg) {
- assertEquals(0, msg.getNumHops());
- assertTrue(msg.getCreateTimeMs() >= tbegin);
- assertEquals(mgr.getHost(), msg.getSource());
- assertEquals(CommInfrastructure.UEB, msg.getProtocol());
- assertEquals(TOPIC2, msg.getTopic());
- assertEquals(THE_EVENT, msg.getPayload());
- assertEquals(REQUEST_ID, msg.getRequestId());
- }
-
- /**
* Configure the mock controller to act like a real controller, invoking beforeOffer
* and then beforeInsert, so we can make sure they pass through. We'll keep count to
* ensure we don't get into infinite recursion.
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
index 27284dcd..f2701038 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -394,8 +394,8 @@ public class ActiveStateTest extends BasicStateTester {
// fire the task - should transition
assertEquals(next, task.third().fire());
- // should stop distributing
- verify(mgr).startDistributing(null);
+ // should continue to distribute
+ verify(mgr, never()).startDistributing(null);
// should publish an offline message
Offline msg = captureAdminMessage(Offline.class);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
index 80778ed4..7cd37581 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -257,8 +257,8 @@ public class QueryStateTest extends BasicStateTester {
assertEquals(next, timer.second().fire());
- // should stop distributing
- verify(mgr).startDistributing(null);
+ // should continue distributing
+ verify(mgr, never()).startDistributing(null);
Offline msg = captureAdminMessage(Offline.class);
assertEquals(MY_HOST, msg.getSource());
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
index 01f49b55..18f12ff8 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -111,7 +111,7 @@ public class StartStateTest extends BasicStateTester {
assertEquals(next, checker.second().fire());
- verify(mgr).internalTopicFailed();
+ verify(mgr).startDistributing(null);
}
@Test
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
index cdf9b59a..e3d383d9 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -423,7 +423,8 @@ public class StateTest extends BasicStateTester {
State next2 = state.missedHeartbeat();
assertEquals(next, next2);
- verify(mgr).startDistributing(null);
+ // should continue to distribute
+ verify(mgr, never()).startDistributing(null);
Offline msg = captureAdminMessage(Offline.class);
assertEquals(MY_HOST, msg.getSource());
@@ -437,7 +438,8 @@ public class StateTest extends BasicStateTester {
State next2 = state.internalTopicFailed();
assertEquals(next, next2);
- verify(mgr).internalTopicFailed();
+ // should stop distributing
+ verify(mgr).startDistributing(null);
Offline msg = captureAdminMessage(Offline.class);
assertEquals(MY_HOST, msg.getSource());