diff options
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state')
9 files changed, 2482 insertions, 0 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java new file mode 100644 index 00000000..7997a4ee --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java @@ -0,0 +1,441 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Arrays; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.message.Query; + +public class ActiveStateTest extends BasicStateTester { + + private ActiveState state; + + @Before + public void setUp() throws Exception { + super.setUp(); + + state = new ActiveState(mgr); + } + + @Test + public void testStart() { + state.start(); + + // ensure the timers were created + verify(mgr, atLeast(1)).scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class)); + + // ensure a heart beat was generated + Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); + assertEquals(MY_HOST, msg.second.getSource()); + } + + @Test + public void testGetFilter() { + Map<String, Object> filter = state.getFilter(); + + FilterUtilsTest utils = new FilterUtilsTest(); + + utils.checkArray(FilterUtils.CLASS_OR, 2, filter); + utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0)); + utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1)); + } + + @Test + public void testProcessHeartbeat_NullHost() { + assertNull(state.process(new Heartbeat())); + + assertFalse(state.isMyHeartbeatSeen()); + assertFalse(state.isPredHeartbeatSeen()); + + verify(mgr, never()).goInactive(); + verify(mgr, never()).goQuery(); + } + + @Test + public void testProcessHeartbeat_MyHost() { + assertNull(state.process(new Heartbeat(MY_HOST, 0L))); + + assertTrue(state.isMyHeartbeatSeen()); + assertFalse(state.isPredHeartbeatSeen()); + + verify(mgr, never()).goInactive(); + verify(mgr, never()).goQuery(); + } + + @Test + public void testProcessHeartbeat_Predecessor() { + assertNull(state.process(new Heartbeat(HOST2, 0L))); + + assertFalse(state.isMyHeartbeatSeen()); + assertTrue(state.isPredHeartbeatSeen()); + + verify(mgr, never()).goInactive(); + verify(mgr, never()).goQuery(); + } + + @Test + public void testProcessHeartbeat_OtherHost() { + assertNull(state.process(new Heartbeat(HOST3, 0L))); + + assertFalse(state.isMyHeartbeatSeen()); + assertFalse(state.isPredHeartbeatSeen()); + + verify(mgr, never()).goInactive(); + verify(mgr, never()).goQuery(); + } + + @Test + public void testProcessOffline_NullHost() { + // should be ignored + assertNull(state.process(new Offline())); + } + + @Test + public void testProcessOffline_UnassignedHost() { + // HOST4 is not in the assignment list - should be ignored + assertNull(state.process(new Offline(HOST4))); + } + + @Test + public void testProcessOffline_IAmLeader() { + // configure the next state + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + // one of the assigned hosts went offline + assertEquals(next, state.process(new Offline(HOST1))); + + // should have sent a new Leader message + Leader msg = captureAdminMessage(Leader.class); + + assertEquals(MY_HOST, msg.getSource()); + + // check new bucket assignments + assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST2), Arrays.asList(msg.getAssignments().getHostArray())); + } + + @Test + public void testProcessOffline_PredecessorIsLeaderNowOffline() { + // configure the next state + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + // I am not the leader, but my predecessor was + mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1})); + state = new ActiveState(mgr); + + // my predecessor went offline + assertEquals(next, state.process(new Offline(PREV_HOST))); + + // should have sent a new Leader message + Leader msg = captureAdminMessage(Leader.class); + + assertEquals(MY_HOST, msg.getSource()); + + // check new bucket assignments + assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST1), Arrays.asList(msg.getAssignments().getHostArray())); + } + + @Test + public void testProcessOffline__PredecessorIsNotLeaderNowOffline() { + // I am not the leader, and neither is my predecessor + mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, PREV_HOST2})); + state = new ActiveState(mgr); + + /* + * + * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader + * thus should be ignored. + */ + assertNull(state.process(new Offline(PREV_HOST2))); + } + + @Test + public void testProcessOffline_OtherAssignedHostOffline() { + // I am not the leader + mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1})); + state = new ActiveState(mgr); + + /* + * HOST1 has buckets, but it isn't the leader and it isn't my + * predecessor, thus should be ignored. + */ + assertNull(state.process(new Offline(HOST1))); + } + + @Test + public void testProcessQuery() { + State next = mock(State.class); + when(mgr.goQuery()).thenReturn(next); + + assertEquals(next, state.process(new Query())); + + Identification ident = captureAdminMessage(Identification.class); + assertEquals(MY_HOST, ident.getSource()); + assertEquals(ASGN3, ident.getAssignments()); + } + + @Test + public void testActiveState() { + assertEquals(MY_HOST, state.getLeader()); + assertEquals(ASGN3, state.getAssignments()); + + // verify that it determined its neighbors + assertEquals(HOST1, state.getSuccHost()); + assertEquals(HOST2, state.getPredHost()); + } + + @Test + public void testDetmNeighbors() { + // if only one host (i.e., itself) + mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST})); + state = new ActiveState(mgr); + assertEquals(null, state.getSuccHost()); + assertEquals("", state.getPredHost()); + + // two hosts + mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, HOST2})); + state = new ActiveState(mgr); + assertEquals(HOST2, state.getSuccHost()); + assertEquals(HOST2, state.getPredHost()); + + // three hosts + mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2})); + state = new ActiveState(mgr); + assertEquals(HOST2, state.getSuccHost()); + assertEquals(HOST3, state.getPredHost()); + + // more hosts + mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2, HOST4})); + state = new ActiveState(mgr); + assertEquals(HOST2, state.getSuccHost()); + assertEquals(HOST4, state.getPredHost()); + } + + @Test + public void testAddTimers_WithPredecessor() { + // invoke start() to add the timers + state.start(); + + assertEquals(3, repeatedFutures.size()); + + Triple<Long, Long, StateTimerTask> timer; + + // heart beat generator + timer = repeatedTasks.remove(); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue()); + + // my heart beat checker + timer = repeatedTasks.remove(); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue()); + + // predecessor's heart beat checker + timer = repeatedTasks.remove(); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue()); + } + + @Test + public void testAddTimers_SansPredecessor() { + // only one host, thus no predecessor + mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST})); + state = new ActiveState(mgr); + + // invoke start() to add the timers + state.start(); + + assertEquals(2, repeatedFutures.size()); + + Triple<Long, Long, StateTimerTask> timer; + + // heart beat generator + timer = repeatedTasks.remove(); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue()); + + // my heart beat checker + timer = repeatedTasks.remove(); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue()); + } + + @Test + public void testAddTimers_HeartbeatGenerator() { + // only one host so we only have to look at one heart beat at a time + mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST})); + state = new ActiveState(mgr); + + // invoke start() to add the timers + state.start(); + + Triple<Long, Long, StateTimerTask> task = repeatedTasks.remove(); + + verify(mgr).publish(anyString(), any(Heartbeat.class)); + + // fire the task + assertNull(task.third.fire(null)); + + // should have generated a second pair of heart beats + verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class)); + + Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); + assertEquals(MY_HOST, msg.first); + assertEquals(MY_HOST, msg.second.getSource()); + } + + @Test + public void testAddTimers_MyHeartbeatSeen() { + // invoke start() to add the timers + state.start(); + + Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1); + + // indicate that this host is still alive + state.process(new Heartbeat(MY_HOST, 0L)); + + // set up next state + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + // fire the task - should not transition + assertNull(task.third.fire(null)); + + verify(mgr, never()).publishAdmin(any(Query.class)); + } + + @Test + public void testAddTimers_MyHeartbeatMissed() { + // invoke start() to add the timers + state.start(); + + Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1); + + // set up next state + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + // fire the task - should transition + assertEquals(next, task.third.fire(null)); + + // should indicate failure + verify(mgr).internalTopicFailed(); + + // should publish an offline message + Offline msg = captureAdminMessage(Offline.class); + assertEquals(MY_HOST, msg.getSource()); + } + + @Test + public void testAddTimers_PredecessorHeartbeatSeen() { + // invoke start() to add the timers + state.start(); + + Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2); + + // indicate that the predecessor is still alive + state.process(new Heartbeat(HOST2, 0L)); + + // set up next state, just in case + State next = mock(State.class); + when(mgr.goQuery()).thenReturn(next); + + // fire the task - should NOT transition + assertNull(task.third.fire(null)); + + verify(mgr, never()).publishAdmin(any(Query.class)); + } + + @Test + public void testAddTimers_PredecessorHeartbeatMissed() { + // invoke start() to add the timers + state.start(); + + Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2); + + // set up next state + State next = mock(State.class); + when(mgr.goQuery()).thenReturn(next); + + // fire the task - should transition + assertEquals(next, task.third.fire(null)); + + verify(mgr).publishAdmin(any(Query.class)); + } + + @Test + public void testGenHeartbeat_OneHost() { + // only one host (i.e., itself) + mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST})); + state = new ActiveState(mgr); + + state.start(); + + verify(mgr, times(1)).publish(any(), any()); + + Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); + assertEquals(MY_HOST, msg.first); + assertEquals(MY_HOST, msg.second.getSource()); + } + + @Test + public void testGenHeartbeat_MultipleHosts() { + state.start(); + + verify(mgr, times(2)).publish(any(), any()); + + Pair<String, Heartbeat> msg; + int index = 0; + + // this message should go to itself + msg = capturePublishedMessage(Heartbeat.class, index++); + assertEquals(MY_HOST, msg.first); + assertEquals(MY_HOST, msg.second.getSource()); + + // this message should go to its successor + msg = capturePublishedMessage(Heartbeat.class, index++); + assertEquals(HOST1, msg.first); + assertEquals(MY_HOST, msg.second.getSource()); + } + +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java new file mode 100644 index 00000000..e48742f7 --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java @@ -0,0 +1,318 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicReference; +import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.PoolingProperties; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; + +/** + * Superclass used to test subclasses of {@link Message}. + */ +public class BasicStateTester { + + protected static final long STD_HEARTBEAT_WAIT_MS = 10; + protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1; + protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1; + protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1; + protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1; + + protected static final String MY_TOPIC = "myTopic"; + + protected static final String PREV_HOST = "prevHost"; + protected static final String PREV_HOST2 = PREV_HOST + "A"; + + // this follows PREV_HOST, alphabetically + protected static final String MY_HOST = PREV_HOST + "X"; + + // these follow MY_HOST, alphabetically + protected static final String HOST1 = MY_HOST + "1"; + protected static final String HOST2 = MY_HOST + "2"; + protected static final String HOST3 = MY_HOST + "3"; + protected static final String HOST4 = MY_HOST + "4"; + + protected static final String LEADER = HOST1; + + protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2}; + + protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments(); + protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3); + + /** + * Futures returned by schedule(). + */ + protected LinkedList<ScheduledFuture<?>> onceFutures; + + /** + * Tasks captured via schedule(). + */ + protected LinkedList<Pair<Long, StateTimerTask>> onceTasks; + + /** + * Futures returned by scheduleWithFixedDelay(). + */ + protected LinkedList<ScheduledFuture<?>> repeatedFutures; + + /** + * Tasks captured via scheduleWithFixedDelay(). + */ + protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks; + + /** + * Messages captured via publish(). + */ + protected LinkedList<Pair<String, Message>> published; + + /** + * Messages captured via publishAdmin(). + */ + protected LinkedList<Message> admin; + + protected PoolingManager mgr; + protected PoolingProperties props; + protected State prevState; + + public BasicStateTester() { + super(); + } + + public void setUp() throws Exception { + onceFutures = new LinkedList<>(); + onceTasks = new LinkedList<>(); + + repeatedFutures = new LinkedList<>(); + repeatedTasks = new LinkedList<>(); + + published = new LinkedList<>(); + admin = new LinkedList<>(); + + mgr = mock(PoolingManager.class); + props = mock(PoolingProperties.class); + + when(mgr.getHost()).thenReturn(MY_HOST); + when(mgr.getTopic()).thenReturn(MY_TOPIC); + when(mgr.getProperties()).thenReturn(props); + + when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS); + when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS); + when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS); + when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS); + when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS); + + prevState = new State(mgr) { + @Override + public Map<String, Object> getFilter() { + throw new UnsupportedOperationException("cannot filter"); + } + }; + + // capture publish() arguments + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + published.add(new Pair<>((String) args[0], (Message) args[1])); + + return null; + }).when(mgr).publish(anyString(), any(Message.class)); + + // capture publishAdmin() arguments + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + admin.add((Message) args[0]); + + return null; + }).when(mgr).publishAdmin(any(Message.class)); + + // capture schedule() arguments, and return a new future + when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> { + Object[] args = invocation.getArguments(); + onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1])); + + ScheduledFuture<?> fut = mock(ScheduledFuture.class); + onceFutures.add(fut); + return fut; + }); + + // capture scheduleWithFixedDelay() arguments, and return a new future + when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> { + Object[] args = invocation.getArguments(); + repeatedTasks.add(new Triple<>((Long) args[0], (Long) args[1], (StateTimerTask) args[2])); + + ScheduledFuture<?> fut = mock(ScheduledFuture.class); + repeatedFutures.add(fut); + return fut; + }); + + // get/set assignments in the manager + AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3); + + when(mgr.getAssignments()).thenAnswer(args -> asgn.get()); + + doAnswer(args -> { + asgn.set(args.getArgumentAt(0, BucketAssignments.class)); + return null; + }).when(mgr).startDistributing(any()); + } + + /** + * Makes a sorted set of hosts. + * + * @param hosts the hosts to be sorted + * @return the set of hosts, sorted + */ + protected SortedSet<String> sortHosts(String... hosts) { + return new TreeSet<>(Arrays.asList(hosts)); + } + + /** + * Captures the host array from the Leader message published to the admin + * channel. + * + * @return the host array, as a list + */ + protected List<String> captureHostList() { + return Arrays.asList(captureHostArray()); + } + + /** + * Captures the host array from the Leader message published to the admin + * channel. + * + * @return the host array + */ + protected String[] captureHostArray() { + BucketAssignments asgn = captureAssignments(); + + String[] arr = asgn.getHostArray(); + assertNotNull(arr); + + return arr; + } + + /** + * Captures the assignments from the Leader message published to the admin + * channel. + * + * @return the bucket assignments + */ + protected BucketAssignments captureAssignments() { + Leader msg = captureAdminMessage(Leader.class); + + BucketAssignments asgn = msg.getAssignments(); + assertNotNull(asgn); + return asgn; + } + + /** + * Captures the message published to the admin channel. + * + * @param clazz type of {@link Message} to capture + * @return the message that was published + */ + protected <T extends Message> T captureAdminMessage(Class<T> clazz) { + return captureAdminMessage(clazz, 0); + } + + /** + * Captures the message published to the admin channel. + * + * @param clazz type of {@link Message} to capture + * @param index index of the item to be captured + * @return the message that was published + */ + protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) { + return clazz.cast(admin.get(index)); + } + + /** + * Captures the message published to the non-admin channels. + * + * @param clazz type of {@link Message} to capture + * @return the (channel,message) pair that was published + */ + protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) { + return capturePublishedMessage(clazz, 0); + } + + /** + * Captures the message published to the non-admin channels. + * + * @param clazz type of {@link Message} to capture + * @param index index of the item to be captured + * @return the (channel,message) pair that was published + */ + protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) { + Pair<String, Message> msg = published.get(index); + return new Pair<>(msg.first, clazz.cast(msg.second)); + } + + /** + * Pair of values. + * + * @param <F> first value's type + * @param <S> second value's type + */ + public static class Pair<F, S> { + public final F first; + public final S second; + + public Pair(F first, S second) { + this.first = first; + this.second = second; + } + } + + /** + * Pair of values. + * + * @param <F> first value's type + * @param <S> second value's type + * @param <T> third value's type + */ + public static class Triple<F, S, T> { + public final F first; + public final S second; + public final T third; + + public Triple(F first, S second, T third) { + this.first = first; + this.second = second; + this.third = third; + } + } + +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java new file mode 100644 index 00000000..ba517194 --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java @@ -0,0 +1,109 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import static org.junit.Assert.assertEquals; +import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_AND; +import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_EQUALS; +import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_OR; +import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_CLASS; +import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FIELD; +import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FILTERS; +import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_VALUE; +import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd; +import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals; +import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; +import java.util.Map; +import org.junit.Test; + +public class FilterUtilsTest { + + @Test + public void testMakeEquals() { + checkEquals("abc", "def", makeEquals("abc", "def")); + } + + @Test + public void testMakeAnd() { + @SuppressWarnings("unchecked") + Map<String, Object> filter = + makeAnd(makeEquals("an1", "av1"), makeEquals("an2", "av2"), makeEquals("an3", "av3")); + + checkArray(CLASS_AND, 3, filter); + checkEquals("an1", "av1", getItem(filter, 0)); + checkEquals("an2", "av2", getItem(filter, 1)); + checkEquals("an3", "av3", getItem(filter, 2)); + } + + @Test + public void testMakeOr() { + @SuppressWarnings("unchecked") + Map<String, Object> filter = + makeOr(makeEquals("on1", "ov1"), makeEquals("on2", "ov2"), makeEquals("on3", "ov3")); + + checkArray(CLASS_OR, 3, filter); + checkEquals("on1", "ov1", getItem(filter, 0)); + checkEquals("on2", "ov2", getItem(filter, 1)); + checkEquals("on3", "ov3", getItem(filter, 2)); + } + + /** + * Checks that the filter contains an array. + * + * @param expectedClassName type of filter this should represent + * @param expectedCount number of items expected in the array + * @param filter filter to be examined + */ + protected void checkArray(String expectedClassName, int expectedCount, Map<String, Object> filter) { + assertEquals(expectedClassName, filter.get(JSON_CLASS)); + + Object[] val = (Object[]) filter.get(JSON_FILTERS); + assertEquals(expectedCount, val.length); + } + + /** + * Checks that a map represents an "equals". + * + * @param name name of the field on the left side of the equals + * @param value value on the right side of the equals + * @param map map whose content is to be examined + */ + protected void checkEquals(String name, String value, Map<String, Object> map) { + assertEquals(CLASS_EQUALS, map.get(JSON_CLASS)); + assertEquals(name, map.get(JSON_FIELD)); + assertEquals(value, map.get(JSON_VALUE)); + } + + /** + * Gets a particular sub-filter from the array contained within a filter. + * + * @param filter containing filter + * @param index index of the sub-filter of interest + * @return the sub-filter with the given index + */ + @SuppressWarnings("unchecked") + protected Map<String, Object> getItem(Map<String, Object> filter, int index) { + Object[] val = (Object[]) filter.get(JSON_FILTERS); + + return (Map<String, Object>) val[index]; + } + +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java new file mode 100644 index 00000000..96c59719 --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java @@ -0,0 +1,121 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Forward; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.message.Query; + +public class IdleStateTest extends BasicStateTester { + + private IdleState state; + + @Before + public void setUp() throws Exception { + super.setUp(); + + state = new IdleState(mgr); + } + + @Test + public void testGetFilter() { + Map<String, Object> filter = state.getFilter(); + + FilterUtilsTest utils = new FilterUtilsTest(); + + utils.checkArray(FilterUtils.CLASS_OR, 2, filter); + utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0)); + utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1)); + } + + @Test + public void testStop() { + state.stop(); + verifyNothingPublished(); + } + + @Test + public void testProcessForward() { + Forward msg = new Forward(); + assertNull(state.process(msg)); + + verify(mgr).handle(msg); + } + + @Test + public void testProcessHeartbeat() { + assertNull(state.process(new Heartbeat(PREV_HOST, 0L))); + verifyNothingPublished(); + } + + @Test + public void testProcessIdentification() { + assertNull(state.process(new Identification(PREV_HOST, null))); + verifyNothingPublished(); + } + + @Test + public void testProcessLeader() { + BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, PREV_HOST, MY_HOST}); + Leader msg = new Leader(PREV_HOST, asgn); + + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + // should stay in current state, but start distributing + assertNull(state.process(msg)); + verify(mgr).startDistributing(asgn); + } + + @Test + public void testProcessOffline() { + assertNull(state.process(new Offline(PREV_HOST))); + verifyNothingPublished(); + } + + @Test + public void testProcessQuery() { + assertNull(state.process(new Query())); + verifyNothingPublished(); + } + + /** + * Verifies that nothing was published on either channel. + */ + private void verifyNothingPublished() { + verify(mgr, never()).publish(any(), any()); + verify(mgr, never()).publishAdmin(any()); + } +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java new file mode 100644 index 00000000..48d5b1ed --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.drools.pooling.message.Message; + +public class InactiveStateTest extends BasicStateTester { + + private InactiveState state; + + @Before + public void setUp() throws Exception { + super.setUp(); + + state = new InactiveState(mgr); + } + + @Test + public void testGetFilter() { + Map<String, Object> filter = state.getFilter(); + + FilterUtilsTest utils = new FilterUtilsTest(); + + utils.checkArray(FilterUtils.CLASS_OR, 2, filter); + utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0)); + utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1)); + } + + @Test + public void testGoInatcive() { + assertNull(state.goInactive()); + } + + @Test + public void testStart() { + state.start(); + + Pair<Long, StateTimerTask> timer = onceTasks.remove(); + + assertEquals(STD_REACTIVATE_WAIT_MS, timer.first.longValue()); + + // invoke the task - it should go to the state returned by the mgr + State next = mock(State.class); + when(mgr.goStart()).thenReturn(next); + + assertEquals(next, timer.second.fire(null)); + } + + @Test + public void testInactiveState() { + /* + * Prove the state is attached to the manager by invoking getHost(), + * which delegates to the manager. + */ + assertEquals(MY_HOST, state.getHost()); + } + +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java new file mode 100644 index 00000000..d60ad2ea --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java @@ -0,0 +1,328 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Arrays; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Query; + +public class ProcessingStateTest extends BasicStateTester { + + private ProcessingState state; + + @Before + public void setUp() throws Exception { + super.setUp(); + + state = new ProcessingState(mgr, MY_HOST); + } + + @Test + public void testGetFilter() { + Map<String, Object> filter = state.getFilter(); + + FilterUtilsTest utils = new FilterUtilsTest(); + + utils.checkArray(FilterUtils.CLASS_OR, 2, filter); + utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0)); + utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1)); + } + + @Test + public void testProcessQuery() { + State next = mock(State.class); + when(mgr.goQuery()).thenReturn(next); + + assertEquals(next, state.process(new Query())); + + Identification ident = captureAdminMessage(Identification.class); + assertEquals(MY_HOST, ident.getSource()); + assertEquals(ASGN3, ident.getAssignments()); + } + + @Test + public void testProcessingState() { + /* + * Null assignments should be OK. + */ + when(mgr.getAssignments()).thenReturn(null); + state = new ProcessingState(mgr, LEADER); + + /* + * Empty assignments should be OK. + */ + when(mgr.getAssignments()).thenReturn(EMPTY_ASGN); + state = new ProcessingState(mgr, LEADER); + assertEquals(MY_HOST, state.getHost()); + assertEquals(LEADER, state.getLeader()); + assertEquals(EMPTY_ASGN, state.getAssignments()); + + /* + * Now try something with assignments. + */ + when(mgr.getAssignments()).thenReturn(ASGN3); + state = new ProcessingState(mgr, LEADER); + + /* + * Prove the state is attached to the manager by invoking getHost(), + * which delegates to the manager. + */ + assertEquals(MY_HOST, state.getHost()); + + assertEquals(LEADER, state.getLeader()); + assertEquals(ASGN3, state.getAssignments()); + } + + @Test(expected = IllegalArgumentException.class) + public void testProcessingState_NullLeader() { + when(mgr.getAssignments()).thenReturn(EMPTY_ASGN); + state = new ProcessingState(mgr, null); + } + + @Test(expected = IllegalArgumentException.class) + public void testProcessingState_ZeroLengthHostArray() { + when(mgr.getAssignments()).thenReturn(new BucketAssignments(new String[] {})); + state = new ProcessingState(mgr, LEADER); + } + + @Test + public void testMakeIdentification() { + Identification ident = state.makeIdentification(); + assertEquals(MY_HOST, ident.getSource()); + assertEquals(ASGN3, ident.getAssignments()); + } + + @Test + public void testGetAssignments() { + // assignments from constructor + assertEquals(ASGN3, state.getAssignments()); + + // null assignments - no effect + state.setAssignments(null); + assertEquals(ASGN3, state.getAssignments()); + + // empty assignments + state.setAssignments(EMPTY_ASGN); + assertEquals(EMPTY_ASGN, state.getAssignments()); + + // non-empty assignments + state.setAssignments(ASGN3); + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testSetAssignments() { + state.setAssignments(null); + verify(mgr, never()).startDistributing(any()); + + state.setAssignments(ASGN3); + verify(mgr).startDistributing(ASGN3); + } + + @Test + public void testGetLeader() { + // check value from constructor + assertEquals(MY_HOST, state.getLeader()); + + state.setLeader(HOST2); + assertEquals(HOST2, state.getLeader()); + + state.setLeader(HOST3); + assertEquals(HOST3, state.getLeader()); + } + + @Test + public void testSetLeader() { + state.setLeader(MY_HOST); + assertEquals(MY_HOST, state.getLeader()); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetLeader_Null() { + state.setLeader(null); + } + + @Test + public void testIsLeader() { + state.setLeader(MY_HOST); + assertTrue(state.isLeader()); + + state.setLeader(HOST2); + assertFalse(state.isLeader()); + } + + @Test + public void testBecomeLeader() { + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + assertEquals(next, state.becomeLeader(sortHosts(MY_HOST, HOST2))); + + Leader msg = captureAdminMessage(Leader.class); + + verify(mgr).startDistributing(msg.getAssignments()); + verify(mgr).goActive(); + } + + @Test(expected = IllegalArgumentException.class) + public void testBecomeLeader_NotFirstAlive() { + // alive list contains something before my host name + state.becomeLeader(sortHosts(PREV_HOST, MY_HOST)); + } + + @Test + public void testMakeLeader() throws Exception { + state.becomeLeader(sortHosts(MY_HOST, HOST2)); + + Leader msg = captureAdminMessage(Leader.class); + + // need a channel before invoking checkValidity() + msg.setChannel(Message.ADMIN); + + msg.checkValidity(); + + assertEquals(MY_HOST, msg.getSource()); + assertNotNull(msg.getAssignments()); + assertTrue(msg.getAssignments().hasAssignment(MY_HOST)); + assertTrue(msg.getAssignments().hasAssignment(HOST2)); + + // this one wasn't in the list of hosts, so it should have been removed + assertFalse(msg.getAssignments().hasAssignment(HOST1)); + } + + @Test + public void testMakeAssignments() throws Exception { + state.becomeLeader(sortHosts(MY_HOST, HOST2)); + + captureAssignments().checkValidity(); + } + + @Test + public void testMakeBucketArray_NullAssignments() { + when(mgr.getAssignments()).thenReturn(null); + state = new ProcessingState(mgr, MY_HOST); + state.becomeLeader(sortHosts(MY_HOST)); + + String[] arr = captureHostArray(); + + assertEquals(BucketAssignments.MAX_BUCKETS, arr.length); + + assertTrue(Arrays.asList(arr).stream().allMatch(host -> MY_HOST.equals(host))); + } + + @Test + public void testMakeBucketArray_ZeroAssignments() { + // bucket assignment with a zero-length array + state.setAssignments(new BucketAssignments(new String[0])); + + state.becomeLeader(sortHosts(MY_HOST)); + + String[] arr = captureHostArray(); + + assertEquals(BucketAssignments.MAX_BUCKETS, arr.length); + + assertTrue(Arrays.asList(arr).stream().allMatch(host -> MY_HOST.equals(host))); + } + + @Test + public void testMakeBucketArray() { + /* + * All hosts are still alive, so it should have the exact same + * assignments as it had to start. + */ + state.setAssignments(ASGN3); + state.becomeLeader(sortHosts(HOST_ARR3)); + + String[] arr = captureHostArray(); + + assertTrue(arr != HOST_ARR3); + assertEquals(Arrays.asList(HOST_ARR3), Arrays.asList(arr)); + } + + @Test + public void testRemoveExcessHosts() { + /** + * All hosts are still alive, plus some others. + */ + state.setAssignments(ASGN3); + state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3, HOST4)); + + // assignments should be unchanged + assertEquals(Arrays.asList(HOST_ARR3), captureHostList()); + } + + @Test + public void testAddIndicesToHostBuckets() { + // some are null, some hosts are no longer alive + String[] asgn = {null, MY_HOST, HOST3, null, HOST4, HOST1, HOST2}; + + state.setAssignments(new BucketAssignments(asgn)); + state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2)); + + // every bucket should be assigned to one of the three hosts + String[] expected = {MY_HOST, MY_HOST, HOST1, HOST2, MY_HOST, HOST1, HOST2}; + assertEquals(Arrays.asList(expected), captureHostList()); + } + + @Test + public void testAssignNullBuckets() { + /* + * Ensure buckets are assigned to the host with the fewest buckets. + */ + String[] asgn = {MY_HOST, HOST1, MY_HOST, null, null, null, null, null, MY_HOST}; + + state.setAssignments(new BucketAssignments(asgn)); + state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2)); + + String[] expected = {MY_HOST, HOST1, MY_HOST, HOST2, HOST1, HOST2, HOST1, HOST2, MY_HOST}; + assertEquals(Arrays.asList(expected), captureHostList()); + } + + @Test + public void testRebalanceBuckets() { + /** + * Some are very lopsided. + */ + String[] asgn = {MY_HOST, HOST1, MY_HOST, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3}; + + state.setAssignments(new BucketAssignments(asgn)); + state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3)); + + String[] expected = {HOST2, HOST1, HOST3, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3}; + assertEquals(Arrays.asList(expected), captureHostList()); + } + +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java new file mode 100644 index 00000000..d714d5cc --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java @@ -0,0 +1,462 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.message.Query; + +public class QueryStateTest extends BasicStateTester { + + private QueryState state; + + @Before + public void setUp() throws Exception { + super.setUp(); + + state = new QueryState(mgr); + } + + @Test + public void testGetFilter() { + Map<String, Object> filter = state.getFilter(); + + FilterUtilsTest utils = new FilterUtilsTest(); + + utils.checkArray(FilterUtils.CLASS_OR, 2, filter); + utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0)); + utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1)); + } + + @Test + public void testStart() { + state.start(); + + Pair<Long, StateTimerTask> timer = onceTasks.remove(); + + assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); + assertNotNull(timer.second); + } + + @Test + public void testGoQuery() { + assertNull(state.process(new Query())); + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testProcessIdentification_NullSource() { + assertNull(state.process(new Identification())); + + assertEquals(MY_HOST, state.getLeader()); + } + + @Test + public void testProcessIdentification_NewLeader() { + assertNull(state.process(new Identification(PREV_HOST, null))); + + assertEquals(PREV_HOST, state.getLeader()); + } + + @Test + public void testProcessIdentification_NotNewLeader() { + assertNull(state.process(new Identification(HOST2, null))); + + assertEquals(MY_HOST, state.getLeader()); + } + + @Test + public void testProcessLeader_NullAssignment() { + Leader msg = new Leader(PREV_HOST, null); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + + // info should be unchanged + assertEquals(MY_HOST, state.getLeader()); + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testProcessLeader_NullSource() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(null, asgn); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + + // info should be unchanged + assertEquals(MY_HOST, state.getLeader()); + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testProcessLeader_SourceIsNotAssignmentLeader() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(HOST2, asgn); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + + // info should be unchanged + assertEquals(MY_HOST, state.getLeader()); + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testProcessLeader_EmptyAssignment() { + Leader msg = new Leader(PREV_HOST, new BucketAssignments()); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + + // info should be unchanged + assertEquals(MY_HOST, state.getLeader()); + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testProcessLeader_BetterLeader() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(PREV_HOST, asgn); + + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + // should go Active and start distributing + assertEquals(next, state.process(msg)); + verify(mgr).startDistributing(asgn); + verify(mgr, never()).goInactive(); + } + + @Test + public void testProcessLeader_NotABetterLeader() { + // no assignments yet + mgr.startDistributing(null); + state = new QueryState(mgr); + + BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2}); + Leader msg = new Leader(HOST1, asgn); + + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + // should stay in the same state + assertNull(state.process(msg)); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + + // should have started distributing + verify(mgr).startDistributing(asgn); + + // this host should still be the leader + assertEquals(MY_HOST, state.getLeader()); + + // new assignments + assertEquals(asgn, state.getAssignments()); + } + + @Test + public void testProcessOffline_NullHost() { + assertNull(state.process(new Offline())); + assertEquals(MY_HOST, state.getLeader()); + } + + @Test + public void testProcessOffline_SameHost() { + assertNull(state.process(new Offline(MY_HOST))); + assertEquals(MY_HOST, state.getLeader()); + } + + @Test + public void testProcessOffline_DiffHost() { + BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, HOST1}); + mgr.startDistributing(asgn); + state = new QueryState(mgr); + + // tell it that the hosts are alive + state.process(new Identification(PREV_HOST, asgn)); + state.process(new Identification(HOST1, asgn)); + + // #2 goes offline + assertNull(state.process(new Offline(HOST1))); + + // #1 should still be the leader + assertEquals(PREV_HOST, state.getLeader()); + + // #1 goes offline + assertNull(state.process(new Offline(PREV_HOST))); + + // this should still be the leader now + assertEquals(MY_HOST, state.getLeader()); + } + + @Test + public void testProcessQuery() { + BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2}); + mgr.startDistributing(asgn); + state = new QueryState(mgr); + + State next = mock(State.class); + when(mgr.goQuery()).thenReturn(next); + + assertEquals(null, state.process(new Query())); + + verify(mgr).publishAdmin(any(Identification.class)); + } + + @Test + public void testQueryState() { + /* + * Prove the state is attached to the manager by invoking getHost(), + * which delegates to the manager. + */ + assertEquals(MY_HOST, state.getHost()); + } + + @Test + public void testAwaitIdentification_Leader() { + state.start(); + + Pair<Long, StateTimerTask> timer = onceTasks.remove(); + + assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); + assertNotNull(timer.second); + + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + assertEquals(next, timer.second.fire(null)); + + // should have published a Leader message + Leader msg = captureAdminMessage(Leader.class); + assertEquals(MY_HOST, msg.getSource()); + assertTrue(msg.getAssignments().hasAssignment(MY_HOST)); + } + + @Test + public void testAwaitIdentification_HasAssignment() { + // not the leader, but has an assignment + BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2}); + mgr.startDistributing(asgn); + state = new QueryState(mgr); + + state.start(); + + // tell it the leader is still active + state.process(new Identification(PREV_HOST, asgn)); + + Pair<Long, StateTimerTask> timer = onceTasks.remove(); + + assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); + assertNotNull(timer.second); + + // set up active state, as that's what it should return + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + assertEquals(next, timer.second.fire(null)); + + // should NOT have published a Leader message + assertTrue(admin.isEmpty()); + + // should have gone active with the current assignments + verify(mgr).goActive(); + } + + @Test + public void testAwaitIdentification_NoAssignment() { + // not the leader and no assignment + BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2}); + mgr.startDistributing(asgn); + state = new QueryState(mgr); + + state.start(); + + // tell it the leader is still active + state.process(new Identification(PREV_HOST, asgn)); + + Pair<Long, StateTimerTask> timer = onceTasks.remove(); + + assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); + assertNotNull(timer.second); + + // set up inactive state, as that's what it should return + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + assertEquals(next, timer.second.fire(null)); + + // should NOT have published a Leader message + assertTrue(admin.isEmpty()); + } + + @Test + public void testHasAssignment() { + // null assignment + mgr.startDistributing(null); + assertFalse(state.hasAssignment()); + + // not in assignments + state.setAssignments(new BucketAssignments(new String[] {HOST3})); + assertFalse(state.hasAssignment()); + + // it IS in the assignments + state.setAssignments(new BucketAssignments(new String[] {MY_HOST})); + assertTrue(state.hasAssignment()); + } + + @Test + public void testRecordInfo_NullSource() { + state.setAssignments(ASGN3); + state.setLeader(MY_HOST); + + BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2}); + state.process(new Identification(null, asgn)); + + // leader unchanged + assertEquals(MY_HOST, state.getLeader()); + + // assignments still updated + assertEquals(asgn, state.getAssignments()); + } + + @Test + public void testRecordInfo_SourcePreceedsMyHost() { + state.setAssignments(ASGN3); + state.setLeader(MY_HOST); + + BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2}); + state.process(new Identification(PREV_HOST, asgn)); + + // new leader + assertEquals(PREV_HOST, state.getLeader()); + + // assignments still updated + assertEquals(asgn, state.getAssignments()); + } + + @Test + public void testRecordInfo_SourceFollowsMyHost() { + mgr.startDistributing(null); + state.setLeader(MY_HOST); + + BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2}); + state.process(new Identification(HOST1, asgn)); + + // leader unchanged + assertEquals(MY_HOST, state.getLeader()); + + // assignments still updated + assertEquals(asgn, state.getAssignments()); + } + + @Test + public void testRecordInfo_NewIsNull() { + state.setAssignments(ASGN3); + state.process(new Identification(HOST1, null)); + + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testRecordInfo_NewIsEmpty() { + state.setAssignments(ASGN3); + state.process(new Identification(PREV_HOST, new BucketAssignments())); + + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testRecordInfo_OldIsNull() { + mgr.startDistributing(null); + + BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2}); + state.process(new Identification(HOST1, asgn)); + + assertEquals(asgn, state.getAssignments()); + } + + @Test + public void testRecordInfo_OldIsEmpty() { + state.setAssignments(new BucketAssignments()); + + BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2}); + state.process(new Identification(HOST1, asgn)); + + assertEquals(asgn, state.getAssignments()); + } + + @Test + public void testRecordInfo_NewLeaderPreceedsOld() { + state.setAssignments(ASGN3); + state.setLeader(MY_HOST); + + BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2}); + state.process(new Identification(HOST3, asgn)); + + assertEquals(asgn, state.getAssignments()); + } + + @Test + public void testRecordInfo_NewLeaderSucceedsOld() { + state.setAssignments(ASGN3); + state.setLeader(MY_HOST); + + BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, HOST3}); + state.process(new Identification(HOST3, asgn)); + + // should be unchanged + assertEquals(ASGN3, state.getAssignments()); + } + +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java new file mode 100644 index 00000000..f29d2348 --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java @@ -0,0 +1,180 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.drools.pooling.message.Forward; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.message.Query; + +public class StartStateTest extends BasicStateTester { + + private StartState state; + + @Before + public void setUp() throws Exception { + super.setUp(); + + state = new StartState(mgr); + } + + @Test + public void testGetFilter() { + Map<String, Object> filter = state.getFilter(); + + FilterUtilsTest utils = new FilterUtilsTest(); + + utils.checkArray(FilterUtils.CLASS_OR, 2, filter); + utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0)); + + // get the sub-filter + filter = utils.getItem(filter, 1); + + utils.checkArray(FilterUtils.CLASS_AND, 2, filter); + utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 0)); + utils.checkEquals(FilterUtils.MSG_TIMESTAMP, String.valueOf(state.getHbTimestampMs()), + utils.getItem(filter, 1)); + } + + @Test + public void testStart() { + state.start(); + + Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); + + assertEquals(MY_HOST, msg.first); + assertEquals(state.getHbTimestampMs(), msg.second.getTimestampMs()); + + Pair<Long, StateTimerTask> timer = onceTasks.removeFirst(); + + assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first.longValue()); + + // invoke the task - it should go to the state returned by the mgr + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + assertEquals(next, timer.second.fire(null)); + + verify(mgr).internalTopicFailed(); + } + + @Test + public void testStartStatePoolingManager() { + /* + * Prove the state is attached to the manager by invoking getHost(), + * which delegates to the manager. + */ + assertEquals(MY_HOST, state.getHost()); + } + + @Test + public void testStartStateState() { + // create a new state from the current state + state = new StartState(mgr); + + /* + * Prove the state is attached to the manager by invoking getHost(), + * which delegates to the manager. + */ + assertEquals(MY_HOST, state.getHost()); + } + + @Test + public void testProcessForward() { + assertNull(state.process(new Forward())); + } + + @Test + public void testProcessHeartbeat() { + Heartbeat msg = new Heartbeat(); + + // no matching data in heart beat + assertNull(state.process(msg)); + verify(mgr, never()).publishAdmin(any()); + + // same source, different time stamp + msg.setSource(MY_HOST); + msg.setTimestampMs(state.getHbTimestampMs() - 1); + assertNull(state.process(msg)); + verify(mgr, never()).publishAdmin(any()); + + // same time stamp, different source + msg.setSource("unknown"); + msg.setTimestampMs(state.getHbTimestampMs()); + assertNull(state.process(msg)); + verify(mgr, never()).publishAdmin(any()); + + // matching heart beat + msg.setSource(MY_HOST); + msg.setTimestampMs(state.getHbTimestampMs()); + + State next = mock(State.class); + when(mgr.goQuery()).thenReturn(next); + + assertEquals(next, state.process(msg)); + + verify(mgr).publishAdmin(any(Query.class)); + } + + @Test + public void testProcessIdentification() { + assertNull(state.process(new Identification(MY_HOST, null))); + } + + @Test + public void testProcessLeader() { + assertNull(state.process(new Leader(MY_HOST, null))); + } + + @Test + public void testProcessOffline() { + assertNull(state.process(new Offline(HOST1))); + } + + @Test + public void testProcessQuery() { + assertNull(state.process(new Query())); + } + + @Test + public void testGetHbTimestampMs() { + long tcurrent = System.currentTimeMillis(); + assertTrue(new StartState(mgr).getHbTimestampMs() >= tcurrent); + + tcurrent = System.currentTimeMillis(); + assertTrue(new StartState(mgr).getHbTimestampMs() >= tcurrent); + } + +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java new file mode 100644 index 00000000..1be48e21 --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java @@ -0,0 +1,440 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Forward; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.message.Query; + +public class StateTest extends BasicStateTester { + + private State state; + + @Before + public void setUp() throws Exception { + super.setUp(); + + state = new MyState(mgr); + } + + @Test + public void testStatePoolingManager() { + /* + * Prove the state is attached to the manager by invoking getHost(), + * which delegates to the manager. + */ + assertEquals(MY_HOST, state.getHost()); + } + + @Test + public void testStateState() { + // allocate a new state, copying from the old state + state = new MyState(mgr); + + /* + * Prove the state is attached to the manager by invoking getHost(), + * which delegates to the manager. + */ + assertEquals(MY_HOST, state.getHost()); + } + + @Test + public void testCancelTimers() { + int delay = 100; + int initDelay = 200; + + /* + * Create three tasks tasks. + */ + + StateTimerTask task1 = mock(StateTimerTask.class); + StateTimerTask task2 = mock(StateTimerTask.class); + StateTimerTask task3 = mock(StateTimerTask.class); + + // two tasks via schedule() + state.schedule(delay, task1); + state.schedule(delay, task2); + + // one task via scheduleWithFixedDelay() + state.scheduleWithFixedDelay(initDelay, delay, task3); + + // ensure all were scheduled, but not yet canceled + verify(mgr).schedule(delay, task1); + verify(mgr).schedule(delay, task2); + verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3); + + ScheduledFuture<?> fut1 = onceFutures.removeFirst(); + ScheduledFuture<?> fut2 = onceFutures.removeFirst(); + ScheduledFuture<?> fut3 = repeatedFutures.removeFirst(); + + verify(fut1, never()).cancel(false); + verify(fut2, never()).cancel(false); + verify(fut3, never()).cancel(false); + + /* + * Cancel the timers. + */ + state.cancelTimers(); + + // verify that all were cancelled + verify(fut1).cancel(false); + verify(fut2).cancel(false); + verify(fut3).cancel(false); + } + + @Test + public void testGetFilter() { + Map<String, Object> filter = state.getFilter(); + + FilterUtilsTest utils = new FilterUtilsTest(); + + utils.checkArray(FilterUtils.CLASS_OR, 2, filter); + utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0)); + utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1)); + } + + @Test + public void testStart() { + state.start(); + } + + @Test + public void testStop() { + state.stop(); + + assertEquals(MY_HOST, captureAdminMessage(Offline.class).getSource()); + } + + @Test + public void testGoStart() { + State next = mock(State.class); + when(mgr.goStart()).thenReturn(next); + + State next2 = state.goStart(); + assertEquals(next, next2); + } + + @Test + public void testGoQuery() { + State next = mock(State.class); + when(mgr.goQuery()).thenReturn(next); + + State next2 = state.goQuery(); + assertEquals(next, next2); + } + + @Test + public void testGoActive() { + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + State next2 = state.goActive(); + assertEquals(next, next2); + } + + @Test + public void testGoInactive() { + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + State next2 = state.goInactive(); + assertEquals(next, next2); + } + + @Test + public void testProcessForward() { + Forward msg = new Forward(); + assertNull(state.process(msg)); + + verify(mgr).handle(msg); + } + + @Test + public void testProcessHeartbeat() { + assertNull(state.process(new Heartbeat())); + } + + @Test + public void testProcessIdentification() { + assertNull(state.process(new Identification())); + } + + @Test + public void testProcessLeader_NullAssignment() { + Leader msg = new Leader(PREV_HOST, null); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + } + + @Test + public void testProcessLeader_NullSource() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(null, asgn); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + } + + @Test + public void testProcessLeader_EmptyAssignment() { + Leader msg = new Leader(PREV_HOST, new BucketAssignments()); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + } + + @Test + public void testProcessLeader_MyHostAssigned() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(PREV_HOST, asgn); + + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + // should go Active and start distributing + assertEquals(next, state.process(msg)); + verify(mgr).startDistributing(asgn); + verify(mgr, never()).goInactive(); + } + + @Test + public void testProcessLeader_MyHostUnassigned() { + String[] arr = {HOST2, HOST1}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(HOST1, asgn); + + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + // should go Inactive and start distributing + assertEquals(next, state.process(msg)); + verify(mgr).startDistributing(asgn); + verify(mgr, never()).goActive(); + } + + @Test + public void testProcessOffline() { + assertNull(state.process(new Offline())); + } + + @Test + public void testProcessQuery() { + assertNull(state.process(new Query())); + } + + @Test + public void testPublishIdentification() { + Identification msg = new Identification(); + state.publish(msg); + + verify(mgr).publishAdmin(msg); + } + + @Test + public void testPublishLeader() { + Leader msg = new Leader(); + state.publish(msg); + + verify(mgr).publishAdmin(msg); + } + + @Test + public void testPublishOffline() { + Offline msg = new Offline(); + state.publish(msg); + + verify(mgr).publishAdmin(msg); + } + + @Test + public void testPublishQuery() { + Query msg = new Query(); + state.publish(msg); + + verify(mgr).publishAdmin(msg); + } + + @Test + public void testPublishStringForward() { + String chnl = "channelF"; + Forward msg = new Forward(); + + state.publish(chnl, msg); + + verify(mgr).publish(chnl, msg); + } + + @Test + public void testPublishStringHeartbeat() { + String chnl = "channelH"; + Heartbeat msg = new Heartbeat(); + + state.publish(chnl, msg); + + verify(mgr).publish(chnl, msg); + } + + @Test + public void testStartDistributing() { + BucketAssignments asgn = new BucketAssignments(); + state.startDistributing(asgn); + + verify(mgr).startDistributing(asgn); + } + + @Test + public void testStartDistributing_NullAssignments() { + state.startDistributing(null); + + verify(mgr, never()).startDistributing(any()); + } + + @Test + public void testSchedule() { + int delay = 100; + + StateTimerTask task = mock(StateTimerTask.class); + + state.schedule(delay, task); + + ScheduledFuture<?> fut = onceFutures.removeFirst(); + + // scheduled, but not canceled yet + verify(mgr).schedule(delay, task); + verify(fut, never()).cancel(false); + + /* + * Ensure the state added the timer to its list by telling it to cancel + * its timers and then seeing if this timer was canceled. + */ + state.cancelTimers(); + verify(fut).cancel(false); + } + + @Test + public void testScheduleWithFixedDelay() { + int initdel = 100; + int delay = 200; + + StateTimerTask task = mock(StateTimerTask.class); + + state.scheduleWithFixedDelay(initdel, delay, task); + + ScheduledFuture<?> fut = repeatedFutures.removeFirst(); + + // scheduled, but not canceled yet + verify(mgr).scheduleWithFixedDelay(initdel, delay, task); + verify(fut, never()).cancel(false); + + /* + * Ensure the state added the timer to its list by telling it to cancel + * its timers and then seeing if this timer was canceled. + */ + state.cancelTimers(); + verify(fut).cancel(false); + } + + @Test + public void testInternalTopicFailed() { + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + State next2 = state.internalTopicFailed(); + assertEquals(next, next2); + + verify(mgr).internalTopicFailed(); + + Offline msg = captureAdminMessage(Offline.class); + assertEquals(MY_HOST, msg.getSource()); + } + + @Test + public void testMakeHeartbeat() { + long timestamp = 30000L; + Heartbeat msg = state.makeHeartbeat(timestamp); + + assertEquals(MY_HOST, msg.getSource()); + assertEquals(timestamp, msg.getTimestampMs()); + } + + @Test + public void testMakeOffline() { + Offline msg = state.makeOffline(); + + assertEquals(MY_HOST, msg.getSource()); + } + + @Test + public void testMakeQuery() { + Query msg = state.makeQuery(); + + assertEquals(MY_HOST, msg.getSource()); + } + + @Test + public void testGetHost() { + assertEquals(MY_HOST, state.getHost()); + } + + @Test + public void testGetTopic() { + assertEquals(MY_TOPIC, state.getTopic()); + } + + /** + * State used for testing purposes, with abstract methods implemented. + */ + private class MyState extends State { + + public MyState(PoolingManager mgr) { + super(mgr); + } + } +} |