aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java')
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java995
1 files changed, 995 insertions, 0 deletions
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
new file mode 100644
index 00000000..ac60ae27
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -0,0 +1,995 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.state.ActiveState;
+import org.onap.policy.drools.pooling.state.IdleState;
+import org.onap.policy.drools.pooling.state.InactiveState;
+import org.onap.policy.drools.pooling.state.QueryState;
+import org.onap.policy.drools.pooling.state.StartState;
+import org.onap.policy.drools.pooling.state.State;
+import org.onap.policy.drools.system.PolicyController;
+
+class PoolingManagerImplTest {
+
+ protected static final long STD_HEARTBEAT_WAIT_MS = 10;
+ protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
+ protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
+ protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
+ protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
+ protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
+
+ private static final String MY_HOST = "my.host";
+ private static final String HOST2 = "other.host";
+
+ private static final String MY_CONTROLLER = "my.controller";
+ private static final String MY_TOPIC = "my.topic";
+
+ private static final String TOPIC2 = "topic.two";
+
+ private static final String THE_EVENT = "the event";
+
+ private static final Object DECODED_EVENT = new Object();
+
+ /**
+ * Number of publish() invocations that should be issued when the manager is
+ * started.
+ */
+ private static final int START_PUB = 1;
+
+ /**
+ * Futures that have been allocated due to calls to scheduleXxx().
+ */
+ private Queue<ScheduledFuture<?>> futures;
+
+ private PoolingProperties poolProps;
+ private ListeningController controller;
+ private TopicMessageManager topicMessageManager;
+ private boolean gotManager;
+ private ScheduledThreadPoolExecutor sched;
+ private int schedCount;
+ private DroolsController drools;
+ private Serializer ser;
+ private CountDownLatch active;
+
+ private PoolingManagerImpl mgr;
+
+ /**
+ * Setup.
+ *
+ * @throws Exception throws exception
+ */
+ @BeforeEach
+ public void setUp() throws Exception {
+ Properties plainProps = new Properties();
+
+ poolProps = mock(PoolingProperties.class);
+ when(poolProps.getSource()).thenReturn(plainProps);
+ when(poolProps.getPoolingTopic()).thenReturn(MY_TOPIC);
+ when(poolProps.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
+ when(poolProps.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
+ when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
+ when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
+ when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
+ when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
+
+ futures = new LinkedList<>();
+ ser = new Serializer();
+ active = new CountDownLatch(1);
+
+ topicMessageManager = mock(TopicMessageManager.class);
+ gotManager = false;
+ controller = mock(ListeningController.class);
+ sched = mock(ScheduledThreadPoolExecutor.class);
+ schedCount = 0;
+ drools = mock(DroolsController.class);
+
+ when(controller.getName()).thenReturn(MY_CONTROLLER);
+ when(controller.getDrools()).thenReturn(drools);
+ when(controller.isAlive()).thenReturn(true);
+
+ when(sched.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))).thenAnswer(args -> {
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ futures.add(fut);
+
+ return fut;
+ });
+
+ when(sched.scheduleWithFixedDelay(any(Runnable.class), any(Long.class), any(Long.class), any(TimeUnit.class)))
+ .thenAnswer(args -> {
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ futures.add(fut);
+
+ return fut;
+ });
+
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active);
+ }
+
+ @Test
+ void testPoolingManagerImpl() {
+ assertTrue(gotManager);
+
+ State st = mgr.getCurrent();
+ assertInstanceOf(IdleState.class, st);
+
+ // ensure the state is attached to the manager
+ assertEquals(mgr.getHost(), st.getHost());
+ }
+
+ @Test
+ void testPoolingManagerImpl_PoolEx() {
+ // throw an exception when we try to create the topic messages manager
+ PoolingFeatureException ex = new PoolingFeatureException();
+
+ assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ throw ex;
+ }
+ }).isInstanceOf(PoolingFeatureRtException.class).hasCause(ex);
+ }
+
+ @Test
+ void testGetCurrent() throws Exception {
+ assertEquals(IdleState.class, mgr.getCurrent().getClass());
+
+ startMgr();
+
+ assertEquals(StartState.class, mgr.getCurrent().getClass());
+ }
+
+ @Test
+ void testGetHost() {
+ assertEquals(MY_HOST, mgr.getHost());
+
+ mgr = new PoolingManagerTest(HOST2, controller, poolProps, active);
+ assertEquals(HOST2, mgr.getHost());
+ }
+
+ @Test
+ void testGetTopic() {
+ assertEquals(MY_TOPIC, mgr.getTopic());
+ }
+
+ @Test
+ void testGetProperties() {
+ assertEquals(poolProps, mgr.getProperties());
+ }
+
+ @Test
+ void testBeforeStart() {
+ // not running yet
+ mgr.beforeStart();
+
+ verify(topicMessageManager).startPublisher();
+
+ assertEquals(1, schedCount);
+ verify(sched).setMaximumPoolSize(1);
+ verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
+
+ // try again - nothing should happen
+ mgr.beforeStart();
+
+ verify(topicMessageManager).startPublisher();
+
+ assertEquals(1, schedCount);
+ verify(sched).setMaximumPoolSize(1);
+ verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ }
+
+ @Test
+ void testAfterStart() throws Exception {
+ startMgr();
+
+ verify(topicMessageManager).startConsumer(mgr);
+
+ State st = mgr.getCurrent();
+ assertInstanceOf(StartState.class, st);
+
+ // ensure the state is attached to the manager
+ assertEquals(mgr.getHost(), st.getHost());
+
+ ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
+ verify(sched).schedule(any(Runnable.class), timeCap.capture(), unitCap.capture());
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, timeCap.getValue().longValue());
+ assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
+
+
+ // already started - nothing else happens
+ mgr.afterStart();
+
+ verify(topicMessageManager).startConsumer(mgr);
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+
+ verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
+ }
+
+ @Test
+ void testBeforeStop() throws Exception {
+ startMgr();
+ mgr.startDistributing(makeAssignments(false));
+
+ verify(topicMessageManager, times(START_PUB)).publish(any());
+
+ mgr.beforeStop();
+
+ verify(topicMessageManager).stopConsumer(mgr);
+ verify(sched).shutdownNow();
+ verify(topicMessageManager, times(START_PUB + 1)).publish(any());
+ verify(topicMessageManager).publish(contains("offline"));
+
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
+
+ // verify that next message is handled locally
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ verify(topicMessageManager, times(START_PUB + 1)).publish(any());
+ }
+
+ @Test
+ void testBeforeStop_NotRunning() {
+ final State st = mgr.getCurrent();
+
+ mgr.beforeStop();
+
+ verify(topicMessageManager, never()).stopConsumer(any());
+ verify(sched, never()).shutdownNow();
+
+ // hasn't changed states either
+ assertEquals(st, mgr.getCurrent());
+ }
+
+ @Test
+ void testBeforeStop_AfterPartialStart() {
+ // call beforeStart but not afterStart
+ mgr.beforeStart();
+
+ final State st = mgr.getCurrent();
+
+ mgr.beforeStop();
+
+ // should still shut the scheduler down
+ verify(sched).shutdownNow();
+
+ verify(topicMessageManager, never()).stopConsumer(any());
+
+ // hasn't changed states
+ assertEquals(st, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterStop() throws Exception {
+ startMgr();
+ mgr.beforeStop();
+
+ mgr.afterStop();
+
+ verify(topicMessageManager).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
+ }
+
+ @Test
+ void testBeforeLock() throws Exception {
+ startMgr();
+
+ mgr.beforeLock();
+
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterUnlock_AliveIdle() {
+ // this really shouldn't happen
+
+ lockMgr();
+
+ mgr.afterUnlock();
+
+ // stays in idle state, because it has no scheduler
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterUnlock_AliveStarted() throws Exception {
+ startMgr();
+ lockMgr();
+
+ mgr.afterUnlock();
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterUnlock_StoppedIdle() throws Exception {
+ startMgr();
+ lockMgr();
+
+ // controller is stopped
+ when(controller.isAlive()).thenReturn(false);
+
+ mgr.afterUnlock();
+
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterUnlock_StoppedStarted() throws Exception {
+ startMgr();
+
+ // Note: don't lockMgr()
+
+ // controller is stopped
+ when(controller.isAlive()).thenReturn(false);
+
+ mgr.afterUnlock();
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testChangeState() throws Exception {
+ // start should invoke changeState()
+ startMgr();
+
+ /*
+ * now go offline while it's locked
+ */
+ lockMgr();
+
+ // should have cancelled the timers
+ assertEquals(2, futures.size());
+ verify(futures.poll()).cancel(false);
+ verify(futures.poll()).cancel(false);
+
+ /*
+ * now go back online
+ */
+ unlockMgr();
+
+ // new timers should now be active
+ assertEquals(2, futures.size());
+ verify(futures.poll(), never()).cancel(false);
+ verify(futures.poll(), never()).cancel(false);
+ }
+
+ @Test
+ void testSchedule() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+ ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
+
+ verify(sched, times(2)).schedule(taskCap.capture(), timeCap.capture(), unitCap.capture());
+
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
+ assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
+
+ // execute it
+ taskCap.getValue().run();
+
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ void testScheduleWithFixedDelay() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+ ArgumentCaptor<Long> initCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
+
+ verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
+ unitCap.capture());
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
+ assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
+
+ // execute it
+ taskCap.getValue().run();
+
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ void testPublishAdmin() throws Exception {
+ Offline msg = new Offline(mgr.getHost());
+ mgr.publishAdmin(msg);
+
+ assertEquals(Message.ADMIN, msg.getChannel());
+
+ verify(topicMessageManager).publish(any());
+ }
+
+ @Test
+ void testPublish() throws Exception {
+ Offline msg = new Offline(mgr.getHost());
+ mgr.publish("my.channel", msg);
+
+ assertEquals("my.channel", msg.getChannel());
+
+ verify(topicMessageManager).publish(any());
+ }
+
+ @Test
+ void testPublish_InvalidMsg() throws Exception {
+ // message is missing data
+ mgr.publish(Message.ADMIN, new Offline());
+
+ // should not have attempted to publish it
+ verify(topicMessageManager, never()).publish(any());
+ }
+
+ @Test
+ void testPublish_TopicMessageMngEx() throws Exception {
+
+ // generate exception
+ doThrow(new PoolingFeatureException()).when(topicMessageManager).publish(any());
+
+ assertThatCode(() -> mgr.publish(Message.ADMIN, new Offline(mgr.getHost()))).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testOnTopicEvent() throws Exception {
+ startMgr();
+
+ StartState st = (StartState) mgr.getCurrent();
+
+ /*
+ * give it its heart beat, that should cause it to transition to the Query state.
+ */
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+ hb.setChannel(Message.ADMIN);
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertInstanceOf(QueryState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testOnTopicEvent_NullEvent() throws Exception {
+ startMgr();
+
+ assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testBeforeOffer_Unlocked() throws Exception {
+ startMgr();
+
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testBeforeOffer_Locked() throws Exception {
+ startMgr();
+ lockMgr();
+
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testBeforeInsert() throws Exception {
+ startMgr();
+ lockMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ @Test
+ void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
+ validateHandleReqId(null);
+ }
+
+ @Test
+ void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
+ validateHandleReqId("");
+ }
+
+ @Test
+ void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
+ startMgr();
+
+ assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
+ }
+
+ @Test
+ void testHandleExternalCommInfrastructureStringStringString() throws Exception {
+ validateUnhandled();
+ }
+
+ @Test
+ void testHandleExternalForward_NoAssignments() throws Exception {
+ validateUnhandled();
+ }
+
+ @Test
+ void testHandleExternalForward() throws Exception {
+ validateNoForward();
+ }
+
+ @Test
+ void testHandleEvent_NullTarget() throws Exception {
+ // buckets have null targets
+ validateDiscarded(new BucketAssignments(new String[] {null, null}));
+ }
+
+ @Test
+ void testHandleEvent_SameHost() throws Exception {
+ validateNoForward();
+ }
+
+ @Test
+ void testHandleEvent_DiffHost() throws Exception {
+ // route the message to the *OTHER* host
+ validateDiscarded(makeAssignments(false));
+ }
+
+ @Test
+ void testDecodeEvent_CannotDecode() throws Exception {
+
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
+ return false;
+ }
+ };
+
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testDecodeEvent_UnsuppEx() throws Exception {
+
+ // generate exception
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testDecodeEvent_ArgEx() throws Exception {
+ // generate exception
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
+ throw new IllegalArgumentException();
+ }
+ };
+
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testDecodeEvent_StateEx() throws Exception {
+ // generate exception
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
+ throw new IllegalStateException();
+ }
+ };
+
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testDecodeEvent() throws Exception {
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // route to another host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testHandleInternal() throws Exception {
+ startMgr();
+
+ StartState st = (StartState) mgr.getCurrent();
+
+ /*
+ * give it its heart beat, that should cause it to transition to the Query state.
+ */
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+ hb.setChannel(Message.ADMIN);
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertInstanceOf(QueryState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testHandleInternal_IoEx() throws Exception {
+ startMgr();
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testHandleInternal_PoolEx() throws Exception {
+ startMgr();
+
+ StartState st = (StartState) mgr.getCurrent();
+
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+
+ /*
+ * do NOT set the channel - this will cause the message to be invalid, triggering
+ * an exception
+ */
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testStartDistributing() throws Exception {
+ validateNoForward();
+
+
+ // null assignments should cause message to be processed locally
+ mgr.startDistributing(null);
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ verify(topicMessageManager, times(START_PUB)).publish(any());
+
+
+ // message for this host
+ mgr.startDistributing(makeAssignments(true));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+
+
+ // message for another host
+ mgr.startDistributing(makeAssignments(false));
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ @Test
+ void testGoStart() {
+ State st = mgr.goStart();
+ assertInstanceOf(StartState.class, st);
+ assertEquals(mgr.getHost(), st.getHost());
+ }
+
+ @Test
+ void testGoQuery() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
+ mgr.startDistributing(asgn);
+
+ State st = mgr.goQuery();
+
+ assertInstanceOf(QueryState.class, st);
+ assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(asgn, mgr.getAssignments());
+ }
+
+ @Test
+ void testGoActive() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
+ mgr.startDistributing(asgn);
+
+ State st = mgr.goActive();
+
+ assertInstanceOf(ActiveState.class, st);
+ assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(asgn, mgr.getAssignments());
+ assertEquals(0, active.getCount());
+ }
+
+ @Test
+ void testGoInactive() {
+ State st = mgr.goInactive();
+ assertInstanceOf(InactiveState.class, st);
+ assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(1, active.getCount());
+ }
+
+ @Test
+ void testTimerActionRun() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+
+ verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
+
+ // execute it
+ taskCap.getValue().run();
+
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ void testTimerActionRun_DiffState() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+
+ verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
+
+ // give it a heartbeat so that it transitions to the query state
+ StartState st = (StartState) mgr.getCurrent();
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+ hb.setChannel(Message.ADMIN);
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertInstanceOf(QueryState.class, mgr.getCurrent());
+
+ // execute it
+ taskCap.getValue().run();
+
+ // it should NOT have counted down
+ assertEquals(1, latch.getCount());
+ }
+
+ private void validateHandleReqId(String requestId) throws PoolingFeatureException {
+ startMgr();
+
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ private void validateNoForward() throws PoolingFeatureException {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+
+ verify(topicMessageManager, times(START_PUB)).publish(any());
+ }
+
+ private void validateUnhandled() throws PoolingFeatureException {
+ startMgr();
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
+ startMgr();
+
+ // buckets have null targets
+ mgr.startDistributing(bucketAssignments);
+
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ /**
+ * Makes an assignment with two buckets.
+ *
+ * @param sameHost {@code true} if the REQUEST_ID should hash to the
+ * manager's bucket, {@code false} if it should hash to the other host's bucket
+ * @return a new bucket assignment
+ */
+ private BucketAssignments makeAssignments(boolean sameHost) {
+ int slot = DECODED_EVENT.hashCode() % 2;
+
+ // slot numbers are 0 and 1 - reverse them if it's for a different host
+ if (!sameHost) {
+ slot = 1 - slot;
+ }
+
+ String[] asgn = new String[2];
+ asgn[slot] = mgr.getHost();
+ asgn[1 - slot] = HOST2;
+
+ return new BucketAssignments(asgn);
+ }
+
+ /**
+ * Invokes methods necessary to start the manager.
+ *
+ */
+ private void startMgr() {
+ mgr.beforeStart();
+ mgr.afterStart();
+ }
+
+ /**
+ * Invokes methods necessary to lock the manager.
+ */
+ private void lockMgr() {
+ mgr.beforeLock();
+ when(controller.isLocked()).thenReturn(true);
+ }
+
+ /**
+ * Invokes methods necessary to unlock the manager.
+ */
+ private void unlockMgr() {
+ mgr.afterUnlock();
+ when(controller.isLocked()).thenReturn(false);
+ }
+
+ /**
+ * Used to create a mock object that implements both super interfaces.
+ */
+ private static interface ListeningController extends TopicListener, PolicyController {
+
+ }
+
+ /**
+ * Manager with overrides.
+ */
+ private class PoolingManagerTest extends PoolingManagerImpl {
+
+ public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+
+ super(host, controller, props, activeLatch);
+ }
+
+ @Override
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ gotManager = true;
+ return topicMessageManager;
+ }
+
+ @Override
+ protected ScheduledThreadPoolExecutor makeScheduler() {
+ ++schedCount;
+ return sched;
+ }
+
+ @Override
+ protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
+ return (drools2 == drools && TOPIC2.equals(topic2));
+ }
+
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
+ if (drools2 == drools && TOPIC2.equals(topic2) && event == THE_EVENT) {
+ return DECODED_EVENT;
+ } else {
+ return null;
+ }
+ }
+ }
+}