aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state')
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java441
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java318
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java109
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java121
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java83
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java328
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java462
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java180
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java440
9 files changed, 2482 insertions, 0 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
new file mode 100644
index 00000000..7997a4ee
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -0,0 +1,441 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class ActiveStateTest extends BasicStateTester {
+
+ private ActiveState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new ActiveState(mgr);
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+
+ // ensure the timers were created
+ verify(mgr, atLeast(1)).scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class));
+
+ // ensure a heart beat was generated
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+ assertEquals(MY_HOST, msg.second.getSource());
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testProcessHeartbeat_NullHost() {
+ assertNull(state.process(new Heartbeat()));
+
+ assertFalse(state.isMyHeartbeatSeen());
+ assertFalse(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ public void testProcessHeartbeat_MyHost() {
+ assertNull(state.process(new Heartbeat(MY_HOST, 0L)));
+
+ assertTrue(state.isMyHeartbeatSeen());
+ assertFalse(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ public void testProcessHeartbeat_Predecessor() {
+ assertNull(state.process(new Heartbeat(HOST2, 0L)));
+
+ assertFalse(state.isMyHeartbeatSeen());
+ assertTrue(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ public void testProcessHeartbeat_OtherHost() {
+ assertNull(state.process(new Heartbeat(HOST3, 0L)));
+
+ assertFalse(state.isMyHeartbeatSeen());
+ assertFalse(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ public void testProcessOffline_NullHost() {
+ // should be ignored
+ assertNull(state.process(new Offline()));
+ }
+
+ @Test
+ public void testProcessOffline_UnassignedHost() {
+ // HOST4 is not in the assignment list - should be ignored
+ assertNull(state.process(new Offline(HOST4)));
+ }
+
+ @Test
+ public void testProcessOffline_IAmLeader() {
+ // configure the next state
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // one of the assigned hosts went offline
+ assertEquals(next, state.process(new Offline(HOST1)));
+
+ // should have sent a new Leader message
+ Leader msg = captureAdminMessage(Leader.class);
+
+ assertEquals(MY_HOST, msg.getSource());
+
+ // check new bucket assignments
+ assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST2), Arrays.asList(msg.getAssignments().getHostArray()));
+ }
+
+ @Test
+ public void testProcessOffline_PredecessorIsLeaderNowOffline() {
+ // configure the next state
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // I am not the leader, but my predecessor was
+ mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
+ state = new ActiveState(mgr);
+
+ // my predecessor went offline
+ assertEquals(next, state.process(new Offline(PREV_HOST)));
+
+ // should have sent a new Leader message
+ Leader msg = captureAdminMessage(Leader.class);
+
+ assertEquals(MY_HOST, msg.getSource());
+
+ // check new bucket assignments
+ assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST1), Arrays.asList(msg.getAssignments().getHostArray()));
+ }
+
+ @Test
+ public void testProcessOffline__PredecessorIsNotLeaderNowOffline() {
+ // I am not the leader, and neither is my predecessor
+ mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, PREV_HOST2}));
+ state = new ActiveState(mgr);
+
+ /*
+ *
+ * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader
+ * thus should be ignored.
+ */
+ assertNull(state.process(new Offline(PREV_HOST2)));
+ }
+
+ @Test
+ public void testProcessOffline_OtherAssignedHostOffline() {
+ // I am not the leader
+ mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
+ state = new ActiveState(mgr);
+
+ /*
+ * HOST1 has buckets, but it isn't the leader and it isn't my
+ * predecessor, thus should be ignored.
+ */
+ assertNull(state.process(new Offline(HOST1)));
+ }
+
+ @Test
+ public void testProcessQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(new Query()));
+
+ Identification ident = captureAdminMessage(Identification.class);
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
+ public void testActiveState() {
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+
+ // verify that it determined its neighbors
+ assertEquals(HOST1, state.getSuccHost());
+ assertEquals(HOST2, state.getPredHost());
+ }
+
+ @Test
+ public void testDetmNeighbors() {
+ // if only one host (i.e., itself)
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
+ state = new ActiveState(mgr);
+ assertEquals(null, state.getSuccHost());
+ assertEquals("", state.getPredHost());
+
+ // two hosts
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, HOST2}));
+ state = new ActiveState(mgr);
+ assertEquals(HOST2, state.getSuccHost());
+ assertEquals(HOST2, state.getPredHost());
+
+ // three hosts
+ mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2}));
+ state = new ActiveState(mgr);
+ assertEquals(HOST2, state.getSuccHost());
+ assertEquals(HOST3, state.getPredHost());
+
+ // more hosts
+ mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2, HOST4}));
+ state = new ActiveState(mgr);
+ assertEquals(HOST2, state.getSuccHost());
+ assertEquals(HOST4, state.getPredHost());
+ }
+
+ @Test
+ public void testAddTimers_WithPredecessor() {
+ // invoke start() to add the timers
+ state.start();
+
+ assertEquals(3, repeatedFutures.size());
+
+ Triple<Long, Long, StateTimerTask> timer;
+
+ // heart beat generator
+ timer = repeatedTasks.remove();
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue());
+
+ // my heart beat checker
+ timer = repeatedTasks.remove();
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue());
+
+ // predecessor's heart beat checker
+ timer = repeatedTasks.remove();
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue());
+ }
+
+ @Test
+ public void testAddTimers_SansPredecessor() {
+ // only one host, thus no predecessor
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
+ state = new ActiveState(mgr);
+
+ // invoke start() to add the timers
+ state.start();
+
+ assertEquals(2, repeatedFutures.size());
+
+ Triple<Long, Long, StateTimerTask> timer;
+
+ // heart beat generator
+ timer = repeatedTasks.remove();
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue());
+
+ // my heart beat checker
+ timer = repeatedTasks.remove();
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue());
+ }
+
+ @Test
+ public void testAddTimers_HeartbeatGenerator() {
+ // only one host so we only have to look at one heart beat at a time
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
+ state = new ActiveState(mgr);
+
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.remove();
+
+ verify(mgr).publish(anyString(), any(Heartbeat.class));
+
+ // fire the task
+ assertNull(task.third.fire(null));
+
+ // should have generated a second pair of heart beats
+ verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class));
+
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+ assertEquals(MY_HOST, msg.first);
+ assertEquals(MY_HOST, msg.second.getSource());
+ }
+
+ @Test
+ public void testAddTimers_MyHeartbeatSeen() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
+
+ // indicate that this host is still alive
+ state.process(new Heartbeat(MY_HOST, 0L));
+
+ // set up next state
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // fire the task - should not transition
+ assertNull(task.third.fire(null));
+
+ verify(mgr, never()).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ public void testAddTimers_MyHeartbeatMissed() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
+
+ // set up next state
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // fire the task - should transition
+ assertEquals(next, task.third.fire(null));
+
+ // should indicate failure
+ verify(mgr).internalTopicFailed();
+
+ // should publish an offline message
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ public void testAddTimers_PredecessorHeartbeatSeen() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
+
+ // indicate that the predecessor is still alive
+ state.process(new Heartbeat(HOST2, 0L));
+
+ // set up next state, just in case
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ // fire the task - should NOT transition
+ assertNull(task.third.fire(null));
+
+ verify(mgr, never()).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ public void testAddTimers_PredecessorHeartbeatMissed() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
+
+ // set up next state
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ // fire the task - should transition
+ assertEquals(next, task.third.fire(null));
+
+ verify(mgr).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ public void testGenHeartbeat_OneHost() {
+ // only one host (i.e., itself)
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
+ state = new ActiveState(mgr);
+
+ state.start();
+
+ verify(mgr, times(1)).publish(any(), any());
+
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+ assertEquals(MY_HOST, msg.first);
+ assertEquals(MY_HOST, msg.second.getSource());
+ }
+
+ @Test
+ public void testGenHeartbeat_MultipleHosts() {
+ state.start();
+
+ verify(mgr, times(2)).publish(any(), any());
+
+ Pair<String, Heartbeat> msg;
+ int index = 0;
+
+ // this message should go to itself
+ msg = capturePublishedMessage(Heartbeat.class, index++);
+ assertEquals(MY_HOST, msg.first);
+ assertEquals(MY_HOST, msg.second.getSource());
+
+ // this message should go to its successor
+ msg = capturePublishedMessage(Heartbeat.class, index++);
+ assertEquals(HOST1, msg.first);
+ assertEquals(MY_HOST, msg.second.getSource());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java
new file mode 100644
index 00000000..e48742f7
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java
@@ -0,0 +1,318 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.PoolingProperties;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+
+/**
+ * Superclass used to test subclasses of {@link Message}.
+ */
+public class BasicStateTester {
+
+ protected static final long STD_HEARTBEAT_WAIT_MS = 10;
+ protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
+ protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
+ protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
+ protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
+
+ protected static final String MY_TOPIC = "myTopic";
+
+ protected static final String PREV_HOST = "prevHost";
+ protected static final String PREV_HOST2 = PREV_HOST + "A";
+
+ // this follows PREV_HOST, alphabetically
+ protected static final String MY_HOST = PREV_HOST + "X";
+
+ // these follow MY_HOST, alphabetically
+ protected static final String HOST1 = MY_HOST + "1";
+ protected static final String HOST2 = MY_HOST + "2";
+ protected static final String HOST3 = MY_HOST + "3";
+ protected static final String HOST4 = MY_HOST + "4";
+
+ protected static final String LEADER = HOST1;
+
+ protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2};
+
+ protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments();
+ protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
+
+ /**
+ * Futures returned by schedule().
+ */
+ protected LinkedList<ScheduledFuture<?>> onceFutures;
+
+ /**
+ * Tasks captured via schedule().
+ */
+ protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
+
+ /**
+ * Futures returned by scheduleWithFixedDelay().
+ */
+ protected LinkedList<ScheduledFuture<?>> repeatedFutures;
+
+ /**
+ * Tasks captured via scheduleWithFixedDelay().
+ */
+ protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks;
+
+ /**
+ * Messages captured via publish().
+ */
+ protected LinkedList<Pair<String, Message>> published;
+
+ /**
+ * Messages captured via publishAdmin().
+ */
+ protected LinkedList<Message> admin;
+
+ protected PoolingManager mgr;
+ protected PoolingProperties props;
+ protected State prevState;
+
+ public BasicStateTester() {
+ super();
+ }
+
+ public void setUp() throws Exception {
+ onceFutures = new LinkedList<>();
+ onceTasks = new LinkedList<>();
+
+ repeatedFutures = new LinkedList<>();
+ repeatedTasks = new LinkedList<>();
+
+ published = new LinkedList<>();
+ admin = new LinkedList<>();
+
+ mgr = mock(PoolingManager.class);
+ props = mock(PoolingProperties.class);
+
+ when(mgr.getHost()).thenReturn(MY_HOST);
+ when(mgr.getTopic()).thenReturn(MY_TOPIC);
+ when(mgr.getProperties()).thenReturn(props);
+
+ when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
+ when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
+ when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
+ when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
+ when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
+
+ prevState = new State(mgr) {
+ @Override
+ public Map<String, Object> getFilter() {
+ throw new UnsupportedOperationException("cannot filter");
+ }
+ };
+
+ // capture publish() arguments
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ published.add(new Pair<>((String) args[0], (Message) args[1]));
+
+ return null;
+ }).when(mgr).publish(anyString(), any(Message.class));
+
+ // capture publishAdmin() arguments
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ admin.add((Message) args[0]);
+
+ return null;
+ }).when(mgr).publishAdmin(any(Message.class));
+
+ // capture schedule() arguments, and return a new future
+ when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1]));
+
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ onceFutures.add(fut);
+ return fut;
+ });
+
+ // capture scheduleWithFixedDelay() arguments, and return a new future
+ when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ repeatedTasks.add(new Triple<>((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
+
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ repeatedFutures.add(fut);
+ return fut;
+ });
+
+ // get/set assignments in the manager
+ AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3);
+
+ when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
+
+ doAnswer(args -> {
+ asgn.set(args.getArgumentAt(0, BucketAssignments.class));
+ return null;
+ }).when(mgr).startDistributing(any());
+ }
+
+ /**
+ * Makes a sorted set of hosts.
+ *
+ * @param hosts the hosts to be sorted
+ * @return the set of hosts, sorted
+ */
+ protected SortedSet<String> sortHosts(String... hosts) {
+ return new TreeSet<>(Arrays.asList(hosts));
+ }
+
+ /**
+ * Captures the host array from the Leader message published to the admin
+ * channel.
+ *
+ * @return the host array, as a list
+ */
+ protected List<String> captureHostList() {
+ return Arrays.asList(captureHostArray());
+ }
+
+ /**
+ * Captures the host array from the Leader message published to the admin
+ * channel.
+ *
+ * @return the host array
+ */
+ protected String[] captureHostArray() {
+ BucketAssignments asgn = captureAssignments();
+
+ String[] arr = asgn.getHostArray();
+ assertNotNull(arr);
+
+ return arr;
+ }
+
+ /**
+ * Captures the assignments from the Leader message published to the admin
+ * channel.
+ *
+ * @return the bucket assignments
+ */
+ protected BucketAssignments captureAssignments() {
+ Leader msg = captureAdminMessage(Leader.class);
+
+ BucketAssignments asgn = msg.getAssignments();
+ assertNotNull(asgn);
+ return asgn;
+ }
+
+ /**
+ * Captures the message published to the admin channel.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @return the message that was published
+ */
+ protected <T extends Message> T captureAdminMessage(Class<T> clazz) {
+ return captureAdminMessage(clazz, 0);
+ }
+
+ /**
+ * Captures the message published to the admin channel.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @param index index of the item to be captured
+ * @return the message that was published
+ */
+ protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) {
+ return clazz.cast(admin.get(index));
+ }
+
+ /**
+ * Captures the message published to the non-admin channels.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @return the (channel,message) pair that was published
+ */
+ protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) {
+ return capturePublishedMessage(clazz, 0);
+ }
+
+ /**
+ * Captures the message published to the non-admin channels.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @param index index of the item to be captured
+ * @return the (channel,message) pair that was published
+ */
+ protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
+ Pair<String, Message> msg = published.get(index);
+ return new Pair<>(msg.first, clazz.cast(msg.second));
+ }
+
+ /**
+ * Pair of values.
+ *
+ * @param <F> first value's type
+ * @param <S> second value's type
+ */
+ public static class Pair<F, S> {
+ public final F first;
+ public final S second;
+
+ public Pair(F first, S second) {
+ this.first = first;
+ this.second = second;
+ }
+ }
+
+ /**
+ * Pair of values.
+ *
+ * @param <F> first value's type
+ * @param <S> second value's type
+ * @param <T> third value's type
+ */
+ public static class Triple<F, S, T> {
+ public final F first;
+ public final S second;
+ public final T third;
+
+ public Triple(F first, S second, T third) {
+ this.first = first;
+ this.second = second;
+ this.third = third;
+ }
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java
new file mode 100644
index 00000000..ba517194
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_AND;
+import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_EQUALS;
+import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_OR;
+import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_CLASS;
+import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FIELD;
+import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FILTERS;
+import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_VALUE;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
+import java.util.Map;
+import org.junit.Test;
+
+public class FilterUtilsTest {
+
+ @Test
+ public void testMakeEquals() {
+ checkEquals("abc", "def", makeEquals("abc", "def"));
+ }
+
+ @Test
+ public void testMakeAnd() {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> filter =
+ makeAnd(makeEquals("an1", "av1"), makeEquals("an2", "av2"), makeEquals("an3", "av3"));
+
+ checkArray(CLASS_AND, 3, filter);
+ checkEquals("an1", "av1", getItem(filter, 0));
+ checkEquals("an2", "av2", getItem(filter, 1));
+ checkEquals("an3", "av3", getItem(filter, 2));
+ }
+
+ @Test
+ public void testMakeOr() {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> filter =
+ makeOr(makeEquals("on1", "ov1"), makeEquals("on2", "ov2"), makeEquals("on3", "ov3"));
+
+ checkArray(CLASS_OR, 3, filter);
+ checkEquals("on1", "ov1", getItem(filter, 0));
+ checkEquals("on2", "ov2", getItem(filter, 1));
+ checkEquals("on3", "ov3", getItem(filter, 2));
+ }
+
+ /**
+ * Checks that the filter contains an array.
+ *
+ * @param expectedClassName type of filter this should represent
+ * @param expectedCount number of items expected in the array
+ * @param filter filter to be examined
+ */
+ protected void checkArray(String expectedClassName, int expectedCount, Map<String, Object> filter) {
+ assertEquals(expectedClassName, filter.get(JSON_CLASS));
+
+ Object[] val = (Object[]) filter.get(JSON_FILTERS);
+ assertEquals(expectedCount, val.length);
+ }
+
+ /**
+ * Checks that a map represents an "equals".
+ *
+ * @param name name of the field on the left side of the equals
+ * @param value value on the right side of the equals
+ * @param map map whose content is to be examined
+ */
+ protected void checkEquals(String name, String value, Map<String, Object> map) {
+ assertEquals(CLASS_EQUALS, map.get(JSON_CLASS));
+ assertEquals(name, map.get(JSON_FIELD));
+ assertEquals(value, map.get(JSON_VALUE));
+ }
+
+ /**
+ * Gets a particular sub-filter from the array contained within a filter.
+ *
+ * @param filter containing filter
+ * @param index index of the sub-filter of interest
+ * @return the sub-filter with the given index
+ */
+ @SuppressWarnings("unchecked")
+ protected Map<String, Object> getItem(Map<String, Object> filter, int index) {
+ Object[] val = (Object[]) filter.get(JSON_FILTERS);
+
+ return (Map<String, Object>) val[index];
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
new file mode 100644
index 00000000..96c59719
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
@@ -0,0 +1,121 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class IdleStateTest extends BasicStateTester {
+
+ private IdleState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new IdleState(mgr);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testStop() {
+ state.stop();
+ verifyNothingPublished();
+ }
+
+ @Test
+ public void testProcessForward() {
+ Forward msg = new Forward();
+ assertNull(state.process(msg));
+
+ verify(mgr).handle(msg);
+ }
+
+ @Test
+ public void testProcessHeartbeat() {
+ assertNull(state.process(new Heartbeat(PREV_HOST, 0L)));
+ verifyNothingPublished();
+ }
+
+ @Test
+ public void testProcessIdentification() {
+ assertNull(state.process(new Identification(PREV_HOST, null)));
+ verifyNothingPublished();
+ }
+
+ @Test
+ public void testProcessLeader() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, PREV_HOST, MY_HOST});
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should stay in current state, but start distributing
+ assertNull(state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ public void testProcessOffline() {
+ assertNull(state.process(new Offline(PREV_HOST)));
+ verifyNothingPublished();
+ }
+
+ @Test
+ public void testProcessQuery() {
+ assertNull(state.process(new Query()));
+ verifyNothingPublished();
+ }
+
+ /**
+ * Verifies that nothing was published on either channel.
+ */
+ private void verifyNothingPublished() {
+ verify(mgr, never()).publish(any(), any());
+ verify(mgr, never()).publishAdmin(any());
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
new file mode 100644
index 00000000..48d5b1ed
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.Message;
+
+public class InactiveStateTest extends BasicStateTester {
+
+ private InactiveState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new InactiveState(mgr);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testGoInatcive() {
+ assertNull(state.goInactive());
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_REACTIVATE_WAIT_MS, timer.first.longValue());
+
+ // invoke the task - it should go to the state returned by the mgr
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+ }
+
+ @Test
+ public void testInactiveState() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
new file mode 100644
index 00000000..d60ad2ea
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
@@ -0,0 +1,328 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class ProcessingStateTest extends BasicStateTester {
+
+ private ProcessingState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new ProcessingState(mgr, MY_HOST);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testProcessQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(new Query()));
+
+ Identification ident = captureAdminMessage(Identification.class);
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
+ public void testProcessingState() {
+ /*
+ * Null assignments should be OK.
+ */
+ when(mgr.getAssignments()).thenReturn(null);
+ state = new ProcessingState(mgr, LEADER);
+
+ /*
+ * Empty assignments should be OK.
+ */
+ when(mgr.getAssignments()).thenReturn(EMPTY_ASGN);
+ state = new ProcessingState(mgr, LEADER);
+ assertEquals(MY_HOST, state.getHost());
+ assertEquals(LEADER, state.getLeader());
+ assertEquals(EMPTY_ASGN, state.getAssignments());
+
+ /*
+ * Now try something with assignments.
+ */
+ when(mgr.getAssignments()).thenReturn(ASGN3);
+ state = new ProcessingState(mgr, LEADER);
+
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+
+ assertEquals(LEADER, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testProcessingState_NullLeader() {
+ when(mgr.getAssignments()).thenReturn(EMPTY_ASGN);
+ state = new ProcessingState(mgr, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testProcessingState_ZeroLengthHostArray() {
+ when(mgr.getAssignments()).thenReturn(new BucketAssignments(new String[] {}));
+ state = new ProcessingState(mgr, LEADER);
+ }
+
+ @Test
+ public void testMakeIdentification() {
+ Identification ident = state.makeIdentification();
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
+ public void testGetAssignments() {
+ // assignments from constructor
+ assertEquals(ASGN3, state.getAssignments());
+
+ // null assignments - no effect
+ state.setAssignments(null);
+ assertEquals(ASGN3, state.getAssignments());
+
+ // empty assignments
+ state.setAssignments(EMPTY_ASGN);
+ assertEquals(EMPTY_ASGN, state.getAssignments());
+
+ // non-empty assignments
+ state.setAssignments(ASGN3);
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testSetAssignments() {
+ state.setAssignments(null);
+ verify(mgr, never()).startDistributing(any());
+
+ state.setAssignments(ASGN3);
+ verify(mgr).startDistributing(ASGN3);
+ }
+
+ @Test
+ public void testGetLeader() {
+ // check value from constructor
+ assertEquals(MY_HOST, state.getLeader());
+
+ state.setLeader(HOST2);
+ assertEquals(HOST2, state.getLeader());
+
+ state.setLeader(HOST3);
+ assertEquals(HOST3, state.getLeader());
+ }
+
+ @Test
+ public void testSetLeader() {
+ state.setLeader(MY_HOST);
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetLeader_Null() {
+ state.setLeader(null);
+ }
+
+ @Test
+ public void testIsLeader() {
+ state.setLeader(MY_HOST);
+ assertTrue(state.isLeader());
+
+ state.setLeader(HOST2);
+ assertFalse(state.isLeader());
+ }
+
+ @Test
+ public void testBecomeLeader() {
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ assertEquals(next, state.becomeLeader(sortHosts(MY_HOST, HOST2)));
+
+ Leader msg = captureAdminMessage(Leader.class);
+
+ verify(mgr).startDistributing(msg.getAssignments());
+ verify(mgr).goActive();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBecomeLeader_NotFirstAlive() {
+ // alive list contains something before my host name
+ state.becomeLeader(sortHosts(PREV_HOST, MY_HOST));
+ }
+
+ @Test
+ public void testMakeLeader() throws Exception {
+ state.becomeLeader(sortHosts(MY_HOST, HOST2));
+
+ Leader msg = captureAdminMessage(Leader.class);
+
+ // need a channel before invoking checkValidity()
+ msg.setChannel(Message.ADMIN);
+
+ msg.checkValidity();
+
+ assertEquals(MY_HOST, msg.getSource());
+ assertNotNull(msg.getAssignments());
+ assertTrue(msg.getAssignments().hasAssignment(MY_HOST));
+ assertTrue(msg.getAssignments().hasAssignment(HOST2));
+
+ // this one wasn't in the list of hosts, so it should have been removed
+ assertFalse(msg.getAssignments().hasAssignment(HOST1));
+ }
+
+ @Test
+ public void testMakeAssignments() throws Exception {
+ state.becomeLeader(sortHosts(MY_HOST, HOST2));
+
+ captureAssignments().checkValidity();
+ }
+
+ @Test
+ public void testMakeBucketArray_NullAssignments() {
+ when(mgr.getAssignments()).thenReturn(null);
+ state = new ProcessingState(mgr, MY_HOST);
+ state.becomeLeader(sortHosts(MY_HOST));
+
+ String[] arr = captureHostArray();
+
+ assertEquals(BucketAssignments.MAX_BUCKETS, arr.length);
+
+ assertTrue(Arrays.asList(arr).stream().allMatch(host -> MY_HOST.equals(host)));
+ }
+
+ @Test
+ public void testMakeBucketArray_ZeroAssignments() {
+ // bucket assignment with a zero-length array
+ state.setAssignments(new BucketAssignments(new String[0]));
+
+ state.becomeLeader(sortHosts(MY_HOST));
+
+ String[] arr = captureHostArray();
+
+ assertEquals(BucketAssignments.MAX_BUCKETS, arr.length);
+
+ assertTrue(Arrays.asList(arr).stream().allMatch(host -> MY_HOST.equals(host)));
+ }
+
+ @Test
+ public void testMakeBucketArray() {
+ /*
+ * All hosts are still alive, so it should have the exact same
+ * assignments as it had to start.
+ */
+ state.setAssignments(ASGN3);
+ state.becomeLeader(sortHosts(HOST_ARR3));
+
+ String[] arr = captureHostArray();
+
+ assertTrue(arr != HOST_ARR3);
+ assertEquals(Arrays.asList(HOST_ARR3), Arrays.asList(arr));
+ }
+
+ @Test
+ public void testRemoveExcessHosts() {
+ /**
+ * All hosts are still alive, plus some others.
+ */
+ state.setAssignments(ASGN3);
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3, HOST4));
+
+ // assignments should be unchanged
+ assertEquals(Arrays.asList(HOST_ARR3), captureHostList());
+ }
+
+ @Test
+ public void testAddIndicesToHostBuckets() {
+ // some are null, some hosts are no longer alive
+ String[] asgn = {null, MY_HOST, HOST3, null, HOST4, HOST1, HOST2};
+
+ state.setAssignments(new BucketAssignments(asgn));
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2));
+
+ // every bucket should be assigned to one of the three hosts
+ String[] expected = {MY_HOST, MY_HOST, HOST1, HOST2, MY_HOST, HOST1, HOST2};
+ assertEquals(Arrays.asList(expected), captureHostList());
+ }
+
+ @Test
+ public void testAssignNullBuckets() {
+ /*
+ * Ensure buckets are assigned to the host with the fewest buckets.
+ */
+ String[] asgn = {MY_HOST, HOST1, MY_HOST, null, null, null, null, null, MY_HOST};
+
+ state.setAssignments(new BucketAssignments(asgn));
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2));
+
+ String[] expected = {MY_HOST, HOST1, MY_HOST, HOST2, HOST1, HOST2, HOST1, HOST2, MY_HOST};
+ assertEquals(Arrays.asList(expected), captureHostList());
+ }
+
+ @Test
+ public void testRebalanceBuckets() {
+ /**
+ * Some are very lopsided.
+ */
+ String[] asgn = {MY_HOST, HOST1, MY_HOST, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3};
+
+ state.setAssignments(new BucketAssignments(asgn));
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3));
+
+ String[] expected = {HOST2, HOST1, HOST3, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3};
+ assertEquals(Arrays.asList(expected), captureHostList());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
new file mode 100644
index 00000000..d714d5cc
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -0,0 +1,462 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class QueryStateTest extends BasicStateTester {
+
+ private QueryState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new QueryState(mgr);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
+ assertNotNull(timer.second);
+ }
+
+ @Test
+ public void testGoQuery() {
+ assertNull(state.process(new Query()));
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessIdentification_NullSource() {
+ assertNull(state.process(new Identification()));
+
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessIdentification_NewLeader() {
+ assertNull(state.process(new Identification(PREV_HOST, null)));
+
+ assertEquals(PREV_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessIdentification_NotNewLeader() {
+ assertNull(state.process(new Identification(HOST2, null)));
+
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessLeader_NullAssignment() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessLeader_NullSource() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(null, asgn);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessLeader_SourceIsNotAssignmentLeader() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(HOST2, asgn);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessLeader_EmptyAssignment() {
+ Leader msg = new Leader(PREV_HOST, new BucketAssignments());
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessLeader_BetterLeader() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_NotABetterLeader() {
+ // no assignments yet
+ mgr.startDistributing(null);
+ state = new QueryState(mgr);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ Leader msg = new Leader(HOST1, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // should stay in the same state
+ assertNull(state.process(msg));
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // should have started distributing
+ verify(mgr).startDistributing(asgn);
+
+ // this host should still be the leader
+ assertEquals(MY_HOST, state.getLeader());
+
+ // new assignments
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessOffline_NullHost() {
+ assertNull(state.process(new Offline()));
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessOffline_SameHost() {
+ assertNull(state.process(new Offline(MY_HOST)));
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessOffline_DiffHost() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, HOST1});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ // tell it that the hosts are alive
+ state.process(new Identification(PREV_HOST, asgn));
+ state.process(new Identification(HOST1, asgn));
+
+ // #2 goes offline
+ assertNull(state.process(new Offline(HOST1)));
+
+ // #1 should still be the leader
+ assertEquals(PREV_HOST, state.getLeader());
+
+ // #1 goes offline
+ assertNull(state.process(new Offline(PREV_HOST)));
+
+ // this should still be the leader now
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessQuery() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(null, state.process(new Query()));
+
+ verify(mgr).publishAdmin(any(Identification.class));
+ }
+
+ @Test
+ public void testQueryState() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testAwaitIdentification_Leader() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
+ assertNotNull(timer.second);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+
+ // should have published a Leader message
+ Leader msg = captureAdminMessage(Leader.class);
+ assertEquals(MY_HOST, msg.getSource());
+ assertTrue(msg.getAssignments().hasAssignment(MY_HOST));
+ }
+
+ @Test
+ public void testAwaitIdentification_HasAssignment() {
+ // not the leader, but has an assignment
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ state.start();
+
+ // tell it the leader is still active
+ state.process(new Identification(PREV_HOST, asgn));
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
+ assertNotNull(timer.second);
+
+ // set up active state, as that's what it should return
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+
+ // should NOT have published a Leader message
+ assertTrue(admin.isEmpty());
+
+ // should have gone active with the current assignments
+ verify(mgr).goActive();
+ }
+
+ @Test
+ public void testAwaitIdentification_NoAssignment() {
+ // not the leader and no assignment
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ state.start();
+
+ // tell it the leader is still active
+ state.process(new Identification(PREV_HOST, asgn));
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
+ assertNotNull(timer.second);
+
+ // set up inactive state, as that's what it should return
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+
+ // should NOT have published a Leader message
+ assertTrue(admin.isEmpty());
+ }
+
+ @Test
+ public void testHasAssignment() {
+ // null assignment
+ mgr.startDistributing(null);
+ assertFalse(state.hasAssignment());
+
+ // not in assignments
+ state.setAssignments(new BucketAssignments(new String[] {HOST3}));
+ assertFalse(state.hasAssignment());
+
+ // it IS in the assignments
+ state.setAssignments(new BucketAssignments(new String[] {MY_HOST}));
+ assertTrue(state.hasAssignment());
+ }
+
+ @Test
+ public void testRecordInfo_NullSource() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ state.process(new Identification(null, asgn));
+
+ // leader unchanged
+ assertEquals(MY_HOST, state.getLeader());
+
+ // assignments still updated
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_SourcePreceedsMyHost() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ state.process(new Identification(PREV_HOST, asgn));
+
+ // new leader
+ assertEquals(PREV_HOST, state.getLeader());
+
+ // assignments still updated
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_SourceFollowsMyHost() {
+ mgr.startDistributing(null);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ state.process(new Identification(HOST1, asgn));
+
+ // leader unchanged
+ assertEquals(MY_HOST, state.getLeader());
+
+ // assignments still updated
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_NewIsNull() {
+ state.setAssignments(ASGN3);
+ state.process(new Identification(HOST1, null));
+
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_NewIsEmpty() {
+ state.setAssignments(ASGN3);
+ state.process(new Identification(PREV_HOST, new BucketAssignments()));
+
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_OldIsNull() {
+ mgr.startDistributing(null);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ state.process(new Identification(HOST1, asgn));
+
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_OldIsEmpty() {
+ state.setAssignments(new BucketAssignments());
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ state.process(new Identification(HOST1, asgn));
+
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_NewLeaderPreceedsOld() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ state.process(new Identification(HOST3, asgn));
+
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_NewLeaderSucceedsOld() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, HOST3});
+ state.process(new Identification(HOST3, asgn));
+
+ // should be unchanged
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
new file mode 100644
index 00000000..f29d2348
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -0,0 +1,180 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class StartStateTest extends BasicStateTester {
+
+ private StartState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new StartState(mgr);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+
+ // get the sub-filter
+ filter = utils.getItem(filter, 1);
+
+ utils.checkArray(FilterUtils.CLASS_AND, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_TIMESTAMP, String.valueOf(state.getHbTimestampMs()),
+ utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+
+ assertEquals(MY_HOST, msg.first);
+ assertEquals(state.getHbTimestampMs(), msg.second.getTimestampMs());
+
+ Pair<Long, StateTimerTask> timer = onceTasks.removeFirst();
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first.longValue());
+
+ // invoke the task - it should go to the state returned by the mgr
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+
+ verify(mgr).internalTopicFailed();
+ }
+
+ @Test
+ public void testStartStatePoolingManager() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testStartStateState() {
+ // create a new state from the current state
+ state = new StartState(mgr);
+
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testProcessForward() {
+ assertNull(state.process(new Forward()));
+ }
+
+ @Test
+ public void testProcessHeartbeat() {
+ Heartbeat msg = new Heartbeat();
+
+ // no matching data in heart beat
+ assertNull(state.process(msg));
+ verify(mgr, never()).publishAdmin(any());
+
+ // same source, different time stamp
+ msg.setSource(MY_HOST);
+ msg.setTimestampMs(state.getHbTimestampMs() - 1);
+ assertNull(state.process(msg));
+ verify(mgr, never()).publishAdmin(any());
+
+ // same time stamp, different source
+ msg.setSource("unknown");
+ msg.setTimestampMs(state.getHbTimestampMs());
+ assertNull(state.process(msg));
+ verify(mgr, never()).publishAdmin(any());
+
+ // matching heart beat
+ msg.setSource(MY_HOST);
+ msg.setTimestampMs(state.getHbTimestampMs());
+
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(msg));
+
+ verify(mgr).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ public void testProcessIdentification() {
+ assertNull(state.process(new Identification(MY_HOST, null)));
+ }
+
+ @Test
+ public void testProcessLeader() {
+ assertNull(state.process(new Leader(MY_HOST, null)));
+ }
+
+ @Test
+ public void testProcessOffline() {
+ assertNull(state.process(new Offline(HOST1)));
+ }
+
+ @Test
+ public void testProcessQuery() {
+ assertNull(state.process(new Query()));
+ }
+
+ @Test
+ public void testGetHbTimestampMs() {
+ long tcurrent = System.currentTimeMillis();
+ assertTrue(new StartState(mgr).getHbTimestampMs() >= tcurrent);
+
+ tcurrent = System.currentTimeMillis();
+ assertTrue(new StartState(mgr).getHbTimestampMs() >= tcurrent);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
new file mode 100644
index 00000000..1be48e21
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -0,0 +1,440 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class StateTest extends BasicStateTester {
+
+ private State state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new MyState(mgr);
+ }
+
+ @Test
+ public void testStatePoolingManager() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testStateState() {
+ // allocate a new state, copying from the old state
+ state = new MyState(mgr);
+
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testCancelTimers() {
+ int delay = 100;
+ int initDelay = 200;
+
+ /*
+ * Create three tasks tasks.
+ */
+
+ StateTimerTask task1 = mock(StateTimerTask.class);
+ StateTimerTask task2 = mock(StateTimerTask.class);
+ StateTimerTask task3 = mock(StateTimerTask.class);
+
+ // two tasks via schedule()
+ state.schedule(delay, task1);
+ state.schedule(delay, task2);
+
+ // one task via scheduleWithFixedDelay()
+ state.scheduleWithFixedDelay(initDelay, delay, task3);
+
+ // ensure all were scheduled, but not yet canceled
+ verify(mgr).schedule(delay, task1);
+ verify(mgr).schedule(delay, task2);
+ verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3);
+
+ ScheduledFuture<?> fut1 = onceFutures.removeFirst();
+ ScheduledFuture<?> fut2 = onceFutures.removeFirst();
+ ScheduledFuture<?> fut3 = repeatedFutures.removeFirst();
+
+ verify(fut1, never()).cancel(false);
+ verify(fut2, never()).cancel(false);
+ verify(fut3, never()).cancel(false);
+
+ /*
+ * Cancel the timers.
+ */
+ state.cancelTimers();
+
+ // verify that all were cancelled
+ verify(fut1).cancel(false);
+ verify(fut2).cancel(false);
+ verify(fut3).cancel(false);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+ }
+
+ @Test
+ public void testStop() {
+ state.stop();
+
+ assertEquals(MY_HOST, captureAdminMessage(Offline.class).getSource());
+ }
+
+ @Test
+ public void testGoStart() {
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ State next2 = state.goStart();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ public void testGoQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ State next2 = state.goQuery();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ public void testGoActive() {
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ State next2 = state.goActive();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ public void testGoInactive() {
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ State next2 = state.goInactive();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ public void testProcessForward() {
+ Forward msg = new Forward();
+ assertNull(state.process(msg));
+
+ verify(mgr).handle(msg);
+ }
+
+ @Test
+ public void testProcessHeartbeat() {
+ assertNull(state.process(new Heartbeat()));
+ }
+
+ @Test
+ public void testProcessIdentification() {
+ assertNull(state.process(new Identification()));
+ }
+
+ @Test
+ public void testProcessLeader_NullAssignment() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_NullSource() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(null, asgn);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_EmptyAssignment() {
+ Leader msg = new Leader(PREV_HOST, new BucketAssignments());
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_MyHostAssigned() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_MyHostUnassigned() {
+ String[] arr = {HOST2, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(HOST1, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // should go Inactive and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ verify(mgr, never()).goActive();
+ }
+
+ @Test
+ public void testProcessOffline() {
+ assertNull(state.process(new Offline()));
+ }
+
+ @Test
+ public void testProcessQuery() {
+ assertNull(state.process(new Query()));
+ }
+
+ @Test
+ public void testPublishIdentification() {
+ Identification msg = new Identification();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ public void testPublishLeader() {
+ Leader msg = new Leader();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ public void testPublishOffline() {
+ Offline msg = new Offline();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ public void testPublishQuery() {
+ Query msg = new Query();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ public void testPublishStringForward() {
+ String chnl = "channelF";
+ Forward msg = new Forward();
+
+ state.publish(chnl, msg);
+
+ verify(mgr).publish(chnl, msg);
+ }
+
+ @Test
+ public void testPublishStringHeartbeat() {
+ String chnl = "channelH";
+ Heartbeat msg = new Heartbeat();
+
+ state.publish(chnl, msg);
+
+ verify(mgr).publish(chnl, msg);
+ }
+
+ @Test
+ public void testStartDistributing() {
+ BucketAssignments asgn = new BucketAssignments();
+ state.startDistributing(asgn);
+
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ public void testStartDistributing_NullAssignments() {
+ state.startDistributing(null);
+
+ verify(mgr, never()).startDistributing(any());
+ }
+
+ @Test
+ public void testSchedule() {
+ int delay = 100;
+
+ StateTimerTask task = mock(StateTimerTask.class);
+
+ state.schedule(delay, task);
+
+ ScheduledFuture<?> fut = onceFutures.removeFirst();
+
+ // scheduled, but not canceled yet
+ verify(mgr).schedule(delay, task);
+ verify(fut, never()).cancel(false);
+
+ /*
+ * Ensure the state added the timer to its list by telling it to cancel
+ * its timers and then seeing if this timer was canceled.
+ */
+ state.cancelTimers();
+ verify(fut).cancel(false);
+ }
+
+ @Test
+ public void testScheduleWithFixedDelay() {
+ int initdel = 100;
+ int delay = 200;
+
+ StateTimerTask task = mock(StateTimerTask.class);
+
+ state.scheduleWithFixedDelay(initdel, delay, task);
+
+ ScheduledFuture<?> fut = repeatedFutures.removeFirst();
+
+ // scheduled, but not canceled yet
+ verify(mgr).scheduleWithFixedDelay(initdel, delay, task);
+ verify(fut, never()).cancel(false);
+
+ /*
+ * Ensure the state added the timer to its list by telling it to cancel
+ * its timers and then seeing if this timer was canceled.
+ */
+ state.cancelTimers();
+ verify(fut).cancel(false);
+ }
+
+ @Test
+ public void testInternalTopicFailed() {
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ State next2 = state.internalTopicFailed();
+ assertEquals(next, next2);
+
+ verify(mgr).internalTopicFailed();
+
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ public void testMakeHeartbeat() {
+ long timestamp = 30000L;
+ Heartbeat msg = state.makeHeartbeat(timestamp);
+
+ assertEquals(MY_HOST, msg.getSource());
+ assertEquals(timestamp, msg.getTimestampMs());
+ }
+
+ @Test
+ public void testMakeOffline() {
+ Offline msg = state.makeOffline();
+
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ public void testMakeQuery() {
+ Query msg = state.makeQuery();
+
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ public void testGetHost() {
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testGetTopic() {
+ assertEquals(MY_TOPIC, state.getTopic());
+ }
+
+ /**
+ * State used for testing purposes, with abstract methods implemented.
+ */
+ private class MyState extends State {
+
+ public MyState(PoolingManager mgr) {
+ super(mgr);
+ }
+ }
+}