diff options
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org')
7 files changed, 69 insertions, 351 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java deleted file mode 100644 index 24144686..00000000 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import java.util.LinkedList; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; -import org.onap.policy.drools.pooling.message.Forward; - -public class EventQueueTest { - - private static final int MAX_SIZE = 5; - private static final long MAX_AGE_MS = 3000L; - - private static final String MY_SOURCE = "my.source"; - private static final CommInfrastructure MY_PROTO = CommInfrastructure.UEB; - private static final String MY_TOPIC = "my.topic"; - private static final String MY_PAYLOAD = "my.payload"; - private static final String MY_REQID = "my.request.id"; - - private EventQueue queue; - - @Before - public void setUp() { - queue = new EventQueue(MAX_SIZE, MAX_AGE_MS); - - } - - @Test - public void testEventQueue() { - // shouldn't generate an exception - new EventQueue(1, 1); - } - - @Test - public void testClear() { - // add some items - queue.add(makeActive()); - queue.add(makeActive()); - - assertFalse(queue.isEmpty()); - - queue.clear(); - - // should be empty now - assertTrue(queue.isEmpty()); - } - - @Test - public void testIsEmpty() { - // test when empty - assertTrue(queue.isEmpty()); - - // all active - Forward msg1 = makeActive(); - Forward msg2 = makeActive(); - queue.add(msg1); - assertFalse(queue.isEmpty()); - - queue.add(msg2); - assertFalse(queue.isEmpty()); - - assertEquals(msg1, queue.poll()); - assertFalse(queue.isEmpty()); - - assertEquals(msg2, queue.poll()); - assertTrue(queue.isEmpty()); - - // active, expired, expired, active - queue.add(msg1); - queue.add(makeInactive()); - queue.add(makeInactive()); - queue.add(msg2); - - assertEquals(msg1, queue.poll()); - assertFalse(queue.isEmpty()); - - assertEquals(msg2, queue.poll()); - assertTrue(queue.isEmpty()); - } - - @Test - public void testSize() { - queue = new EventQueue(2, 1000L); - assertEquals(0, queue.size()); - - queue.add(makeActive()); - assertEquals(1, queue.size()); - - queue.poll(); - assertEquals(0, queue.size()); - - queue.add(makeActive()); - queue.add(makeActive()); - assertEquals(2, queue.size()); - - queue.poll(); - assertEquals(1, queue.size()); - - queue.poll(); - assertEquals(0, queue.size()); - } - - @Test - public void testAdd() { - int nextra = 3; - - // create excess messages - LinkedList<Forward> msgs = new LinkedList<>(); - for (int x = 0; x < MAX_SIZE + nextra; ++x) { - msgs.add(makeActive()); - } - - // add them to the queue - msgs.forEach(msg -> queue.add(msg)); - - // should not have added too many messages - assertEquals(MAX_SIZE, queue.size()); - - // should have discarded the first "nextra" items - for (int x = 0; x < MAX_SIZE; ++x) { - assertEquals("x=" + x, msgs.get(x + nextra), queue.poll()); - } - - assertEquals(null, queue.poll()); - } - - @Test - public void testPoll() { - // poll when empty - assertNull(queue.poll()); - - // all active - Forward msg1 = makeActive(); - Forward msg2 = makeActive(); - queue.add(msg1); - queue.add(msg2); - - assertEquals(msg1, queue.poll()); - assertEquals(msg2, queue.poll()); - assertEquals(null, queue.poll()); - - // active, expired, expired, active - queue.add(msg1); - queue.add(makeInactive()); - queue.add(makeInactive()); - queue.add(msg2); - - assertEquals(msg1, queue.poll()); - assertEquals(msg2, queue.poll()); - assertEquals(null, queue.poll()); - - // one that's close to the age limit - msg1 = makeActive(); - msg1.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS + 100); - queue.add(msg1); - assertEquals(msg1, queue.poll()); - assertEquals(null, queue.poll()); - } - - private Forward makeActive() { - return new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID); - } - - private Forward makeInactive() { - Forward msg = new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID); - - msg.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS - 100); - - return msg; - } - -} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java index 298af064..6280ebed 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java @@ -28,9 +28,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX; import java.io.IOException; +import java.util.Arrays; import java.util.Deque; import java.util.IdentityHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; @@ -121,6 +123,7 @@ public class FeatureTest2 { // these are saved and restored on exit from this test class private static PoolingFeature.Factory saveFeatureFactory; private static PoolingManagerImpl.Factory saveManagerFactory; + private static DmaapManager.Factory saveDmaapFactory; /** * Sink for external DMaaP topic. @@ -128,6 +131,11 @@ public class FeatureTest2 { private static TopicSink externalSink; /** + * Sink for internal DMaaP topic. + */ + private static TopicSink internalSink; + + /** * Context for the current test case. */ private Context ctx; @@ -137,18 +145,23 @@ public class FeatureTest2 { public static void setUpBeforeClass() { saveFeatureFactory = PoolingFeature.getFactory(); saveManagerFactory = PoolingManagerImpl.getFactory(); + saveDmaapFactory = DmaapManager.getFactory(); - Properties props = makeSinkProperties(EXTERNAL_TOPIC); - externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0); + externalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0); externalSink.start(); + + internalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0); + internalSink.start(); } @AfterClass public static void tearDownAfterClass() { PoolingFeature.setFactory(saveFeatureFactory); PoolingManagerImpl.setFactory(saveManagerFactory); + DmaapManager.setFactory(saveDmaapFactory); externalSink.stop(); + internalSink.stop(); } @Before @@ -443,6 +456,7 @@ public class FeatureTest2 { private final AtomicBoolean sawMsg = new AtomicBoolean(false); private final TopicSource externalSource; + private final TopicSource internalSource; // mock objects private final PolicyEngine engine = mock(PolicyEngine.class); @@ -458,8 +472,8 @@ public class FeatureTest2 { when(controller.getName()).thenReturn(CONTROLLER1); when(controller.getDrools()).thenReturn(drools); - Properties props = makeSourceProperties(EXTERNAL_TOPIC); - externalSource = TopicEndpoint.manager.addTopicSources(props).get(0); + externalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0); + internalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0); // stop consuming events if the controller stops when(controller.stop()).thenAnswer(args -> { @@ -489,6 +503,18 @@ public class FeatureTest2 { * "DMaaP" topic and its own internal "DMaaP" topic. */ public void start() { + DmaapManager.setFactory(new DmaapManager.Factory() { + @Override + public List<TopicSource> getTopicSources() { + return Arrays.asList(internalSource, externalSource); + } + + @Override + public List<TopicSink> getTopicSinks() { + return Arrays.asList(internalSink, externalSink); + } + }); + feature.beforeStart(engine); feature.afterCreate(controller); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index 64573ab0..d74b87f9 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -23,7 +23,6 @@ package org.onap.policy.drools.pooling; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -106,7 +105,6 @@ public class PoolingManagerImplTest { private Properties plainProps; private PoolingProperties poolProps; private ListeningController controller; - private EventQueue eventQueue; private ClassExtractors extractors; private DmaapManager dmaap; private ScheduledThreadPoolExecutor sched; @@ -146,14 +144,12 @@ public class PoolingManagerImplTest { active = new CountDownLatch(1); factory = mock(Factory.class); - eventQueue = mock(EventQueue.class); extractors = mock(ClassExtractors.class); dmaap = mock(DmaapManager.class); controller = mock(ListeningController.class); sched = mock(ScheduledThreadPoolExecutor.class); drools = mock(DroolsController.class); - when(factory.makeEventQueue(any())).thenReturn(eventQueue); when(factory.makeClassExtractors(any())).thenReturn(extractors); when(factory.makeDmaapManager(any())).thenReturn(dmaap); when(factory.makeScheduler()).thenReturn(sched); @@ -304,24 +300,25 @@ public class PoolingManagerImplTest { @Test public void testBeforeStop() throws Exception { startMgr(); - mgr.startDistributing(makeAssignments(true)); + mgr.startDistributing(makeAssignments(false)); - // verify that this message is not queued Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID); mgr.handle(msg); - verify(eventQueue, never()).add(msg); + verify(dmaap, times(START_PUB+1)).publish(any()); mgr.beforeStop(); verify(dmaap).stopConsumer(mgr); verify(sched).shutdownNow(); + verify(dmaap, times(START_PUB+2)).publish(any()); verify(dmaap).publish(contains("offline")); assertTrue(mgr.getCurrent() instanceof IdleState); - // verify that next message is queued - mgr.handle(msg); - verify(eventQueue).add(msg); + // verify that next message is handled locally + mgr.handle(msg); + verify(dmaap, times(START_PUB+2)).publish(any()); + verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT); } @Test @@ -360,26 +357,8 @@ public class PoolingManagerImplTest { startMgr(); mgr.beforeStop(); - when(eventQueue.isEmpty()).thenReturn(false); - when(eventQueue.size()).thenReturn(3); - - mgr.afterStop(); - - verify(eventQueue).clear(); - verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); - } - - @Test - public void testAfterStop_EmptyQueue() throws Exception { - startMgr(); - mgr.beforeStop(); - - when(eventQueue.isEmpty()).thenReturn(true); - when(eventQueue.size()).thenReturn(0); - mgr.afterStop(); - verify(eventQueue, never()).clear(); verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); } @@ -499,18 +478,6 @@ public class PoolingManagerImplTest { } @Test - public void testInternalTopicFailed() throws Exception { - startMgr(); - - CountDownLatch latch = mgr.internalTopicFailed(); - - // wait for the thread to complete - assertTrue(latch.await(2, TimeUnit.SECONDS)); - - verify(controller).stop(); - } - - @Test public void testSchedule() throws Exception { // must start the scheduler startMgr(); @@ -693,14 +660,7 @@ public class PoolingManagerImplTest { public void testBeforeInsert_NoIntercept() throws Exception { startMgr(); - long tbegin = System.currentTimeMillis(); - - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class); - verify(eventQueue).add(msgCap.capture()); - - validateMessageContent(tbegin, msgCap.getValue()); + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); } @Test @@ -726,37 +686,20 @@ public class PoolingManagerImplTest { startMgr(); assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT)); - - // should not have tried to enqueue a message - verify(eventQueue, never()).add(any()); } @Test public void testHandleExternalCommInfrastructureStringStringString() throws Exception { startMgr(); - long tbegin = System.currentTimeMillis(); - - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class); - verify(eventQueue).add(msgCap.capture()); - - validateMessageContent(tbegin, msgCap.getValue()); + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); } @Test public void testHandleExternalForward_NoAssignments() throws Exception { startMgr(); - long tbegin = System.currentTimeMillis(); - - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class); - verify(eventQueue).add(msgCap.capture()); - - validateMessageContent(tbegin, msgCap.getValue()); + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); } @Test @@ -921,25 +864,26 @@ public class PoolingManagerImplTest { @Test public void testMakeForward() throws Exception { startMgr(); - - long tbegin = System.currentTimeMillis(); + + // route the message to another host + mgr.startDistributing(makeAssignments(false)); assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class); - verify(eventQueue).add(msgCap.capture()); - - validateMessageContent(tbegin, msgCap.getValue()); + + verify(dmaap, times(START_PUB+1)).publish(any()); } @Test public void testMakeForward_InvalidMsg() throws Exception { startMgr(); + + // route the message to another host + mgr.startDistributing(makeAssignments(false)); assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT)); - // should not have tried to enqueue a message - verify(eventQueue, never()).add(any()); + // should not have tried to publish a message + verify(dmaap, times(START_PUB)).publish(any()); } @Test @@ -1064,69 +1008,27 @@ public class PoolingManagerImplTest { startMgr(); // route the message to this host - assertNotNull(mgr.startDistributing(makeAssignments(true))); + mgr.startDistributing(makeAssignments(true)); assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - verify(eventQueue, never()).add(any()); + verify(dmaap, times(START_PUB)).publish(any()); - // null assignments should cause message to be queued - assertNull(mgr.startDistributing(null)); - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - verify(eventQueue).add(any()); + // null assignments should cause message to be processed locally + mgr.startDistributing(null); + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + verify(dmaap, times(START_PUB)).publish(any()); // route the message to this host - assertNotNull(mgr.startDistributing(makeAssignments(true))); + mgr.startDistributing(makeAssignments(true)); assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - verify(eventQueue).add(any()); + verify(dmaap, times(START_PUB)).publish(any()); // route the message to the other host - assertNotNull(mgr.startDistributing(makeAssignments(false))); + mgr.startDistributing(makeAssignments(false)); assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - verify(eventQueue).add(any()); - } - - @Test - public void testStartDistributing_EventsInQueue_ProcessLocally() throws Exception { - startMgr(); - - // put items in the queue - LinkedList<Forward> lst = new LinkedList<>(); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - - when(eventQueue.poll()).thenAnswer(args -> lst.poll()); - - // route the messages to this host - CountDownLatch latch = mgr.startDistributing(makeAssignments(true)); - assertTrue(latch.await(2, TimeUnit.SECONDS)); - - // all of the events should have been processed locally - verify(dmaap, times(START_PUB)).publish(any()); - verify(controller, times(3)).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT); - } - - @Test - public void testStartDistributing_EventsInQueue_Forward() throws Exception { - startMgr(); - - // put items in the queue - LinkedList<Forward> lst = new LinkedList<>(); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - - when(eventQueue.poll()).thenAnswer(args -> lst.poll()); - - // route the messages to the OTHER host - CountDownLatch latch = mgr.startDistributing(makeAssignments(false)); - assertTrue(latch.await(2, TimeUnit.SECONDS)); - - // all of the events should have been forwarded - verify(dmaap, times(4)).publish(any()); - verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT); + verify(dmaap, times(START_PUB+1)).publish(any()); } @Test @@ -1228,22 +1130,6 @@ public class PoolingManagerImplTest { } /** - * Validates the message content. - * - * @param tbegin creation time stamp must be no less than this - * @param msg message to be validated - */ - private void validateMessageContent(long tbegin, Forward msg) { - assertEquals(0, msg.getNumHops()); - assertTrue(msg.getCreateTimeMs() >= tbegin); - assertEquals(mgr.getHost(), msg.getSource()); - assertEquals(CommInfrastructure.UEB, msg.getProtocol()); - assertEquals(TOPIC2, msg.getTopic()); - assertEquals(THE_EVENT, msg.getPayload()); - assertEquals(REQUEST_ID, msg.getRequestId()); - } - - /** * Configure the mock controller to act like a real controller, invoking beforeOffer * and then beforeInsert, so we can make sure they pass through. We'll keep count to * ensure we don't get into infinite recursion. diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java index 27284dcd..f2701038 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java @@ -394,8 +394,8 @@ public class ActiveStateTest extends BasicStateTester { // fire the task - should transition assertEquals(next, task.third().fire()); - // should stop distributing - verify(mgr).startDistributing(null); + // should continue to distribute + verify(mgr, never()).startDistributing(null); // should publish an offline message Offline msg = captureAdminMessage(Offline.class); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java index 80778ed4..7cd37581 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java @@ -257,8 +257,8 @@ public class QueryStateTest extends BasicStateTester { assertEquals(next, timer.second().fire()); - // should stop distributing - verify(mgr).startDistributing(null); + // should continue distributing + verify(mgr, never()).startDistributing(null); Offline msg = captureAdminMessage(Offline.class); assertEquals(MY_HOST, msg.getSource()); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java index 01f49b55..18f12ff8 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java @@ -111,7 +111,7 @@ public class StartStateTest extends BasicStateTester { assertEquals(next, checker.second().fire()); - verify(mgr).internalTopicFailed(); + verify(mgr).startDistributing(null); } @Test diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java index cdf9b59a..e3d383d9 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java @@ -423,7 +423,8 @@ public class StateTest extends BasicStateTester { State next2 = state.missedHeartbeat(); assertEquals(next, next2); - verify(mgr).startDistributing(null); + // should continue to distribute + verify(mgr, never()).startDistributing(null); Offline msg = captureAdminMessage(Offline.class); assertEquals(MY_HOST, msg.getSource()); @@ -437,7 +438,8 @@ public class StateTest extends BasicStateTester { State next2 = state.internalTopicFailed(); assertEquals(next, next2); - verify(mgr).internalTopicFailed(); + // should stop distributing + verify(mgr).startDistributing(null); Offline msg = captureAdminMessage(Offline.class); assertEquals(MY_HOST, msg.getSource()); |