aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/test/java/org/onap
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2018-03-26 16:48:31 -0400
committerJim Hahn <jrh3@att.com>2018-03-28 23:47:53 -0400
commita3fa1c69a955af57f4e9023488bac3ef67a4fc3e (patch)
tree0f5173ea23c5d40cdef0f64dffc3fc18e695cf64 /feature-pooling-dmaap/src/test/java/org/onap
parent1d2c8346e0ac02320ca933b66c1943c7f72343c6 (diff)
Add pooling capability
Add an optional feature that that supports session pooling, wherein more than one host can be active at a time. Use beforeInsert() instead of beforeOffer(), where possible. Move request-id-extraction from policy-managment to feature-pooling. Combined AdditionalProperties into PoolingProperties. Finished junit tests for DmaapManager. Adjusted filters for all XxxState classes, and added testGetFilter to all XxxStateTest classes. Always publish Offline message when the internal topic fails. Remove DelayedExtractor, as it isn't needed. Renamed ExtractorMap to ClassExtractors, and added property name prefix to the constructor to give more control over property naming to invokers. Remove State copy constructor. Use class name instead of class in ClassExtractors map. Remove BucketAssignments from ProcessingState. Remove some TODO items. Add META-INF for implemented feature APIs. Fix ClassExtractor bug where it can't find a field in a superclass, and add a test for classes defined in another file. Add assembly and rename project directory. Add more junit coverage. Change-Id: I7f132f84a7b284a58ab09c9069db19b853acd7e9 Issue-ID: POLICY-577 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap')
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java355
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java196
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java74
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java42
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureRtExceptionTest.java35
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java495
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java1342
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java178
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java96
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java186
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java440
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java40
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java32
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java34
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java77
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java72
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java99
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties33
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BasicMessageTester.java245
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java351
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java217
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java62
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java77
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java77
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/MessageTest.java80
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/MessageWithAssignmentsTester.java110
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java41
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java41
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java41
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java441
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java318
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java109
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java121
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java83
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java328
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java462
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java180
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java440
38 files changed, 7650 insertions, 0 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
new file mode 100644
index 00000000..f68f2395
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -0,0 +1,355 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Properties;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.drools.event.comm.FilterableTopicSource;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
+import org.onap.policy.drools.pooling.DmaapManager.Factory;
+
+public class DmaapManagerTest {
+
+ private static String MY_TOPIC = "my.topic";
+ private static String MSG = "a message";
+ private static String FILTER = "a filter";
+
+ /**
+ * Original factory, to be restored when all tests complete.
+ */
+ private static Factory saveFactory;
+
+ private Properties props;
+ private Factory factory;
+ private TopicListener listener;
+ private FilterableTopicSource source;
+ private TopicSink sink;
+ private DmaapManager mgr;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ saveFactory = DmaapManager.getFactory();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ DmaapManager.setFactory(saveFactory);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ props = new Properties();
+
+ listener = mock(TopicListener.class);
+ factory = mock(Factory.class);
+ source = mock(FilterableTopicSource.class);
+ sink = mock(TopicSink.class);
+
+ DmaapManager.setFactory(factory);
+
+ when(source.getTopic()).thenReturn(MY_TOPIC);
+
+ when(sink.getTopic()).thenReturn(MY_TOPIC);
+ when(sink.send(any())).thenReturn(true);
+
+ // three sources, with the desired one in the middle
+ when(factory.initTopicSources(props))
+ .thenReturn(Arrays.asList(mock(TopicSource.class), source, mock(TopicSource.class)));
+
+ // three sinks, with the desired one in the middle
+ when(factory.initTopicSinks(props))
+ .thenReturn(Arrays.asList(mock(TopicSink.class), sink, mock(TopicSink.class)));
+
+ mgr = new DmaapManager(MY_TOPIC, props);
+ }
+
+ @Test
+ public void testDmaapManager() {
+ // verify that the init methods were called
+ verify(factory).initTopicSinks(props);
+ verify(factory).initTopicSinks(props);
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testDmaapManager_PoolingEx() throws PoolingFeatureException {
+ // force error by having no topics match
+ when(source.getTopic()).thenReturn("");
+
+ new DmaapManager(MY_TOPIC, props);
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testDmaapManager_IllegalArgEx() throws PoolingFeatureException {
+ // force error
+ when(factory.initTopicSources(props)).thenThrow(new IllegalArgumentException("expected"));
+
+ new DmaapManager(MY_TOPIC, props);
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testDmaapManager_CannotFilter() throws PoolingFeatureException {
+ // force an error when setFilter() is called
+ doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any());
+
+ new DmaapManager(MY_TOPIC, props);
+ }
+
+ @Test
+ public void testGetTopic() {
+ assertEquals(MY_TOPIC, mgr.getTopic());
+ }
+
+ @Test
+ public void testFindTopicSource() {
+ // getting here means it worked
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testFindTopicSource_NotFilterableTopicSource() throws PoolingFeatureException {
+
+ // matching topic, but doesn't have the correct interface
+ TopicSource source2 = mock(TopicSource.class);
+ when(source2.getTopic()).thenReturn(MY_TOPIC);
+
+ when(factory.initTopicSources(props)).thenReturn(Arrays.asList(source2));
+
+ new DmaapManager(MY_TOPIC, props);
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testFindTopicSource_NotFound() throws PoolingFeatureException {
+ // one item in list, and its topic doesn't match
+ when(factory.initTopicSources(props)).thenReturn(Arrays.asList(mock(TopicSource.class)));
+
+ new DmaapManager(MY_TOPIC, props);
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testFindTopicSource_EmptyList() throws PoolingFeatureException {
+ // empty list
+ when(factory.initTopicSources(props)).thenReturn(new LinkedList<>());
+
+ new DmaapManager(MY_TOPIC, props);
+ }
+
+ @Test
+ public void testFindTopicSink() {
+ // getting here means it worked
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testFindTopicSink_NotFound() throws PoolingFeatureException {
+ // one item in list, and its topic doesn't match
+ when(factory.initTopicSinks(props)).thenReturn(Arrays.asList(mock(TopicSink.class)));
+
+ new DmaapManager(MY_TOPIC, props);
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testFindTopicSink_EmptyList() throws PoolingFeatureException {
+ // empty list
+ when(factory.initTopicSinks(props)).thenReturn(new LinkedList<>());
+
+ new DmaapManager(MY_TOPIC, props);
+ }
+
+ @Test
+ public void testStartPublisher() throws PoolingFeatureException {
+ // not started yet
+ verify(sink, never()).start();
+
+ mgr.startPublisher();
+ verify(sink).start();
+
+ // restart should have no effect
+ mgr.startPublisher();
+ verify(sink).start();
+
+ // should be able to publish now
+ mgr.publish(MSG);
+ verify(sink).send(MSG);
+ }
+
+ @Test
+ public void testStartPublisher_Exception() throws PoolingFeatureException {
+ // force exception when it starts
+ doThrow(new IllegalStateException("expected")).when(sink).start();
+
+ expectException("startPublisher,start", xxx -> mgr.startPublisher());
+ expectException("startPublisher,publish", xxx -> mgr.publish(MSG));
+
+ // allow it to succeed this time
+ reset(sink);
+ when(sink.send(any())).thenReturn(true);
+
+ mgr.startPublisher();
+ verify(sink).start();
+
+ // should be able to publish now
+ mgr.publish(MSG);
+ verify(sink).send(MSG);
+ }
+
+ @Test
+ public void testStopPublisher() throws PoolingFeatureException {
+ // not publishing yet, so stopping should have no effect
+ mgr.stopPublisher();
+ verify(sink, never()).stop();
+
+ // now start it
+ mgr.startPublisher();
+
+ // this time, stop should do something
+ mgr.stopPublisher();
+ verify(sink).stop();
+
+ // re-stopping should have no effect
+ mgr.stopPublisher();
+ verify(sink).stop();
+ }
+
+ @Test
+ public void testStopPublisher_Exception() throws PoolingFeatureException {
+ mgr.startPublisher();
+
+ // force exception when it stops
+ doThrow(new IllegalStateException("expected")).when(sink).stop();
+
+ mgr.stopPublisher();
+ }
+
+ @Test
+ public void testStartConsumer() {
+ // not started yet
+ verify(source, never()).register(any());
+
+ mgr.startConsumer(listener);
+ verify(source).register(listener);
+
+ // restart should have no effect
+ mgr.startConsumer(listener);
+ verify(source).register(listener);
+ }
+
+ @Test
+ public void testStopConsumer() {
+ // not consuming yet, so stopping should have no effect
+ mgr.stopConsumer(listener);
+ verify(source, never()).unregister(any());
+
+ // now start it
+ mgr.startConsumer(listener);
+
+ // this time, stop should do something
+ mgr.stopConsumer(listener);
+ verify(source).unregister(listener);
+
+ // re-stopping should have no effect
+ mgr.stopConsumer(listener);
+ verify(source).unregister(listener);
+ }
+
+ @Test
+ public void testSetFilter() throws PoolingFeatureException {
+ mgr.setFilter(FILTER);
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testSetFilter_Exception() throws PoolingFeatureException {
+ // force an error when setFilter() is called
+ doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any());
+
+ mgr.setFilter(FILTER);
+ }
+
+ @Test
+ public void testPublish() throws PoolingFeatureException {
+ // cannot publish before starting
+ expectException("publish,pre", xxx -> mgr.publish(MSG));
+
+ mgr.startPublisher();
+
+ // publish several messages
+ mgr.publish(MSG);
+ verify(sink).send(MSG);
+
+ mgr.publish(MSG+"a");
+ verify(sink).send(MSG+"a");
+
+ mgr.publish(MSG+"b");
+ verify(sink).send(MSG+"b");
+
+ // stop and verify we can no longer publish
+ mgr.stopPublisher();
+ expectException("publish,stopped", xxx -> mgr.publish(MSG));
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testPublish_SendFailed() throws PoolingFeatureException {
+ mgr.startPublisher();
+
+ // arrange for send() to fail
+ when(sink.send(MSG)).thenReturn(false);
+
+ mgr.publish(MSG);
+ }
+
+ @Test(expected = PoolingFeatureException.class)
+ public void testPublish_SendEx() throws PoolingFeatureException {
+ mgr.startPublisher();
+
+ // arrange for send() to throw an exception
+ doThrow(new IllegalStateException("expected")).when(sink).send(MSG);
+
+ mgr.publish(MSG);
+ }
+
+ private void expectException(String testnm, VFunction func) {
+ try {
+ func.apply(null);
+ fail(testnm + " missing exception");
+
+ } catch (PoolingFeatureException expected) {
+ // OK
+ }
+ }
+
+ @FunctionalInterface
+ public static interface VFunction {
+ public void apply(Void arg) throws PoolingFeatureException;
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java
new file mode 100644
index 00000000..24144686
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java
@@ -0,0 +1,196 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import java.util.LinkedList;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.pooling.message.Forward;
+
+public class EventQueueTest {
+
+ private static final int MAX_SIZE = 5;
+ private static final long MAX_AGE_MS = 3000L;
+
+ private static final String MY_SOURCE = "my.source";
+ private static final CommInfrastructure MY_PROTO = CommInfrastructure.UEB;
+ private static final String MY_TOPIC = "my.topic";
+ private static final String MY_PAYLOAD = "my.payload";
+ private static final String MY_REQID = "my.request.id";
+
+ private EventQueue queue;
+
+ @Before
+ public void setUp() {
+ queue = new EventQueue(MAX_SIZE, MAX_AGE_MS);
+
+ }
+
+ @Test
+ public void testEventQueue() {
+ // shouldn't generate an exception
+ new EventQueue(1, 1);
+ }
+
+ @Test
+ public void testClear() {
+ // add some items
+ queue.add(makeActive());
+ queue.add(makeActive());
+
+ assertFalse(queue.isEmpty());
+
+ queue.clear();
+
+ // should be empty now
+ assertTrue(queue.isEmpty());
+ }
+
+ @Test
+ public void testIsEmpty() {
+ // test when empty
+ assertTrue(queue.isEmpty());
+
+ // all active
+ Forward msg1 = makeActive();
+ Forward msg2 = makeActive();
+ queue.add(msg1);
+ assertFalse(queue.isEmpty());
+
+ queue.add(msg2);
+ assertFalse(queue.isEmpty());
+
+ assertEquals(msg1, queue.poll());
+ assertFalse(queue.isEmpty());
+
+ assertEquals(msg2, queue.poll());
+ assertTrue(queue.isEmpty());
+
+ // active, expired, expired, active
+ queue.add(msg1);
+ queue.add(makeInactive());
+ queue.add(makeInactive());
+ queue.add(msg2);
+
+ assertEquals(msg1, queue.poll());
+ assertFalse(queue.isEmpty());
+
+ assertEquals(msg2, queue.poll());
+ assertTrue(queue.isEmpty());
+ }
+
+ @Test
+ public void testSize() {
+ queue = new EventQueue(2, 1000L);
+ assertEquals(0, queue.size());
+
+ queue.add(makeActive());
+ assertEquals(1, queue.size());
+
+ queue.poll();
+ assertEquals(0, queue.size());
+
+ queue.add(makeActive());
+ queue.add(makeActive());
+ assertEquals(2, queue.size());
+
+ queue.poll();
+ assertEquals(1, queue.size());
+
+ queue.poll();
+ assertEquals(0, queue.size());
+ }
+
+ @Test
+ public void testAdd() {
+ int nextra = 3;
+
+ // create excess messages
+ LinkedList<Forward> msgs = new LinkedList<>();
+ for (int x = 0; x < MAX_SIZE + nextra; ++x) {
+ msgs.add(makeActive());
+ }
+
+ // add them to the queue
+ msgs.forEach(msg -> queue.add(msg));
+
+ // should not have added too many messages
+ assertEquals(MAX_SIZE, queue.size());
+
+ // should have discarded the first "nextra" items
+ for (int x = 0; x < MAX_SIZE; ++x) {
+ assertEquals("x=" + x, msgs.get(x + nextra), queue.poll());
+ }
+
+ assertEquals(null, queue.poll());
+ }
+
+ @Test
+ public void testPoll() {
+ // poll when empty
+ assertNull(queue.poll());
+
+ // all active
+ Forward msg1 = makeActive();
+ Forward msg2 = makeActive();
+ queue.add(msg1);
+ queue.add(msg2);
+
+ assertEquals(msg1, queue.poll());
+ assertEquals(msg2, queue.poll());
+ assertEquals(null, queue.poll());
+
+ // active, expired, expired, active
+ queue.add(msg1);
+ queue.add(makeInactive());
+ queue.add(makeInactive());
+ queue.add(msg2);
+
+ assertEquals(msg1, queue.poll());
+ assertEquals(msg2, queue.poll());
+ assertEquals(null, queue.poll());
+
+ // one that's close to the age limit
+ msg1 = makeActive();
+ msg1.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS + 100);
+ queue.add(msg1);
+ assertEquals(msg1, queue.poll());
+ assertEquals(null, queue.poll());
+ }
+
+ private Forward makeActive() {
+ return new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID);
+ }
+
+ private Forward makeInactive() {
+ Forward msg = new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID);
+
+ msg.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS - 100);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java
new file mode 100644
index 00000000..5f918f73
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.generalize;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
+import java.util.Properties;
+import org.junit.Test;
+
+public class FeatureEnabledCheckerTest {
+
+ private static final String PROP_NAME = "enable.{?.}it";
+
+ private static final String SPEC = "my.specializer";
+
+ @Test
+ public void test() {
+ assertFalse(check(null, null));
+ assertTrue(check(null, true));
+ assertFalse(check(null, false));
+
+ assertTrue(check(true, null));
+ assertTrue(check(true, true));
+ assertFalse(check(true, false));
+
+ assertFalse(check(false, null));
+ assertTrue(check(false, true));
+ assertFalse(check(false, false));
+ }
+
+ /**
+ * Adds properties, as specified, and checks if the feature is enabled.
+ *
+ * @param wantGen value to assign to the generalized property, or
+ * {@code null} to leave it unset
+ * @param wantSpec value to assign to the specialized property, or
+ * {@code null} to leave it unset
+ * @return {@code true} if the feature is enabled, {@code false} otherwise
+ */
+ public boolean check(Boolean wantGen, Boolean wantSpec) {
+ Properties props = new Properties();
+
+ if (wantGen != null) {
+ props.setProperty(generalize(PROP_NAME), wantGen.toString());
+ }
+
+ if (wantSpec != null) {
+ props.setProperty(specialize(PROP_NAME, SPEC), wantSpec.toString());
+ }
+
+ return FeatureEnabledChecker.isFeatureEnabled(props, SPEC, PROP_NAME);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java
new file mode 100644
index 00000000..5b423d4b
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+
+public class PoolingFeatureExceptionTest extends ExceptionsTester {
+
+ @Test
+ public void test() {
+ assertEquals(5, test(PoolingFeatureException.class));
+ }
+
+ @Test
+ public void testToRuntimeException() {
+ PoolingFeatureException plainExc = new PoolingFeatureException("hello");
+ PoolingFeatureRtException runtimeExc = plainExc.toRuntimeException();
+
+ assertTrue(plainExc == runtimeExc.getCause());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureRtExceptionTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureRtExceptionTest.java
new file mode 100644
index 00000000..cbb24421
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureRtExceptionTest.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+import org.onap.policy.drools.pooling.PoolingFeatureRtException;
+
+public class PoolingFeatureRtExceptionTest extends ExceptionsTester {
+
+ @Test
+ public void test() {
+ assertEquals(5, test(PoolingFeatureRtException.class));
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
new file mode 100644
index 00000000..cd1aea09
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -0,0 +1,495 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.LinkedList;
+import java.util.List;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.pooling.PoolingFeature.Factory;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.utils.Pair;
+
+public class PoolingFeatureTest {
+
+ private static final String CONFIG_DIR = "src/test/java/org/onap/policy/drools/pooling";
+
+ private static final String CONTROLLER1 = "controllerA";
+ private static final String CONTROLLER2 = "controllerB";
+ private static final String CONTROLLER_DISABLED = "controllerDisabled";
+ private static final String CONTROLLER_EX = "controllerException";
+ private static final String CONTROLLER_UNKNOWN = "controllerUnknown";
+
+ private static final String TOPIC1 = "topic.one";
+ private static final String TOPIC2 = "topic.two";
+
+ private static final String EVENT1 = "event.one";
+ private static final String EVENT2 = "event.two";
+
+ private static final Object OBJECT1 = new Object();
+ private static final Object OBJECT2 = new Object();
+
+ /**
+ * Saved from PoolingFeature and restored on exit from this test class.
+ */
+ private static Factory saveFactory;
+
+ private PolicyController controller1;
+ private PolicyController controller2;
+ private PolicyController controllerDisabled;
+ private PolicyController controllerException;
+ private PolicyController controllerUnknown;
+ private DroolsController drools1;
+ private DroolsController drools2;
+ private DroolsController droolsDisabled;
+ private List<Pair<PoolingManagerImpl, PoolingProperties>> managers;
+ private PoolingManagerImpl mgr1;
+ private PoolingManagerImpl mgr2;
+ private Factory factory;
+
+ private PoolingFeature pool;
+
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveFactory = PoolingFeature.getFactory();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ PoolingFeature.setFactory(saveFactory);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ factory = mock(Factory.class);
+ controller1 = mock(PolicyController.class);
+ controller2 = mock(PolicyController.class);
+ controllerDisabled = mock(PolicyController.class);
+ controllerException = mock(PolicyController.class);
+ controllerUnknown = mock(PolicyController.class);
+ drools1 = mock(DroolsController.class);
+ drools2 = mock(DroolsController.class);
+ droolsDisabled = mock(DroolsController.class);
+ managers = new LinkedList<>();
+
+ PoolingFeature.setFactory(factory);
+
+ when(controller1.getName()).thenReturn(CONTROLLER1);
+ when(controller2.getName()).thenReturn(CONTROLLER2);
+ when(controllerDisabled.getName()).thenReturn(CONTROLLER_DISABLED);
+ when(controllerException.getName()).thenReturn(CONTROLLER_EX);
+ when(controllerUnknown.getName()).thenReturn(CONTROLLER_UNKNOWN);
+
+ when(factory.getController(drools1)).thenReturn(controller1);
+ when(factory.getController(drools2)).thenReturn(controller2);
+ when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled);
+
+ when(factory.makeManager(any(), any())).thenAnswer(args -> {
+ PoolingProperties props = args.getArgumentAt(1, PoolingProperties.class);
+
+ PoolingManagerImpl mgr = mock(PoolingManagerImpl.class);
+
+ managers.add(new Pair<>(mgr, props));
+
+ return mgr;
+ });
+
+ pool = new PoolingFeature();
+
+ pool.globalInit(null, CONFIG_DIR);
+
+ pool.afterCreate(controller1);
+ pool.afterCreate(controller2);
+
+ mgr1 = managers.get(0).first();
+ mgr2 = managers.get(1).first();
+ }
+
+ @Test
+ public void test() {
+ assertEquals(2, managers.size());
+ }
+
+ @Test
+ public void testGetSequenceNumber() {
+ assertEquals(0, pool.getSequenceNumber());
+ }
+
+ @Test
+ public void testGlobalInit() {
+ pool = new PoolingFeature();
+
+ pool.globalInit(null, CONFIG_DIR);
+ }
+
+ @Test(expected = PoolingFeatureRtException.class)
+ public void testGlobalInit_NotFound() {
+ pool = new PoolingFeature();
+
+ pool.globalInit(null, CONFIG_DIR + "/unknown");
+ }
+
+ @Test
+ public void testAfterCreate() {
+ managers.clear();
+ pool = new PoolingFeature();
+ pool.globalInit(null, CONFIG_DIR);
+
+ assertFalse(pool.afterCreate(controller1));
+ assertEquals(1, managers.size());
+
+ // duplicate
+ assertFalse(pool.afterCreate(controller1));
+ assertEquals(1, managers.size());
+
+ // second controller
+ assertFalse(pool.afterCreate(controller2));
+ assertEquals(2, managers.size());
+ }
+
+ @Test
+ public void testAfterCreate_NotEnabled() {
+ managers.clear();
+ pool = new PoolingFeature();
+ pool.globalInit(null, CONFIG_DIR);
+
+ assertFalse(pool.afterCreate(controllerDisabled));
+ assertTrue(managers.isEmpty());
+ }
+
+ @Test(expected = PoolingFeatureRtException.class)
+ public void testAfterCreate_PropertyEx() {
+ managers.clear();
+ pool = new PoolingFeature();
+ pool.globalInit(null, CONFIG_DIR);
+
+ pool.afterCreate(controllerException);
+ }
+
+ @Test(expected = PoolingFeatureRtException.class)
+ public void testAfterCreate_NoProps() {
+ pool = new PoolingFeature();
+
+ // did not perform globalInit, which is an error
+
+ pool.afterCreate(controller1);
+ }
+
+ @Test
+ public void testAfterCreate_NoFeatProps() {
+ managers.clear();
+ pool = new PoolingFeature();
+ pool.globalInit(null, CONFIG_DIR);
+
+ assertFalse(pool.afterCreate(controllerUnknown));
+ assertTrue(managers.isEmpty());
+ }
+
+ @Test
+ public void testBeforeStart() throws Exception {
+ assertFalse(pool.beforeStart(controller1));
+ verify(mgr1).beforeStart();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeStart(controller1));
+ verify(mgr1, times(2)).beforeStart();
+
+ assertFalse(pool.beforeStart(controllerDisabled));
+ }
+
+ @Test
+ public void testAfterStart() {
+ assertFalse(pool.afterStart(controller1));
+ verify(mgr1).afterStart();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.afterStart(controller1));
+ verify(mgr1, times(2)).afterStart();
+
+ assertFalse(pool.afterStart(controllerDisabled));
+ }
+
+ @Test
+ public void testBeforeStop() {
+ assertFalse(pool.beforeStop(controller1));
+ verify(mgr1).beforeStop();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeStop(controller1));
+ verify(mgr1, times(2)).beforeStop();
+
+ assertFalse(pool.beforeStop(controllerDisabled));
+ }
+
+ @Test
+ public void testAfterStop() {
+ assertFalse(pool.afterStop(controller1));
+ verify(mgr1).afterStop();
+
+ // ensure it has been removed from the map by re-invoking
+ assertFalse(pool.afterStop(controller1));
+
+ // count should be unchanged
+ verify(mgr1).afterStop();
+
+ assertFalse(pool.afterStop(controllerDisabled));
+ }
+
+ @Test
+ public void testBeforeLock() {
+ assertFalse(pool.beforeLock(controller1));
+ verify(mgr1).beforeLock();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeLock(controller1));
+ verify(mgr1, times(2)).beforeLock();
+
+ assertFalse(pool.beforeLock(controllerDisabled));
+ }
+
+ @Test
+ public void testAfterUnlock() {
+ assertFalse(pool.afterUnlock(controller1));
+ verify(mgr1).afterUnlock();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.afterUnlock(controller1));
+ verify(mgr1, times(2)).afterUnlock();
+
+ assertFalse(pool.afterUnlock(controllerDisabled));
+ }
+
+ @Test
+ public void testBeforeOffer() {
+ assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+
+ // ensure that the args were captured
+ pool.beforeInsert(drools1, OBJECT1);
+ verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
+ verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+
+ // ensure that the new args were captured
+ pool.beforeInsert(drools1, OBJECT2);
+ verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+
+
+ assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ }
+
+ @Test
+ public void testBeforeOffer_NotFound() {
+ assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ }
+
+ @Test
+ public void testBeforeOffer_MgrTrue() {
+
+ // manager will return true
+ when(mgr1.beforeOffer(any(), any(), any())).thenReturn(true);
+
+ assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+
+ // ensure it's still in the map by re-invoking
+ assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
+ verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+
+ assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ }
+
+ @Test
+ public void testBeforeInsert() {
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+
+ // ensure it's still in the map by re-invoking
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2);
+ assertFalse(pool.beforeInsert(drools1, OBJECT2));
+ verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+
+ pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
+ assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
+ }
+
+ @Test
+ public void testBeforeInsert_NoArgs() {
+
+ // call beforeInsert without beforeOffer
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+
+ assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ }
+
+ @Test
+ public void testBeforeInsert_ArgEx() {
+
+ // generate exception
+ doThrow(new IllegalArgumentException()).when(factory).getController(any());
+
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ }
+
+ @Test
+ public void testBeforeInsert_StateEx() {
+
+ // generate exception
+ doThrow(new IllegalStateException()).when(factory).getController(any());
+
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ }
+
+ @Test
+ public void testBeforeInsert_NullController() {
+
+ // return null controller
+ when(factory.getController(any())).thenReturn(null);
+
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ }
+
+ @Test
+ public void testBeforeInsert_NotFound() {
+
+ pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
+ assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
+ }
+
+ @Test
+ public void testAfterOffer() {
+ // this will create OfferArgs
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+
+ // this should clear them
+ assertFalse(pool.afterOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2, true));
+
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+
+
+ assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
+ }
+
+ @Test
+ public void testDoManager() throws Exception {
+ assertFalse(pool.beforeStart(controller1));
+ verify(mgr1).beforeStart();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeStart(controller1));
+ verify(mgr1, times(2)).beforeStart();
+
+
+ // different controller
+ assertFalse(pool.beforeStart(controller2));
+ verify(mgr2).beforeStart();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeStart(controller2));
+ verify(mgr2, times(2)).beforeStart();
+
+
+ assertFalse(pool.beforeStart(controllerDisabled));
+ }
+
+ @Test
+ public void testDoManager_NotFound() {
+ assertFalse(pool.beforeStart(controllerDisabled));
+ }
+
+ @Test(expected = PoolingFeatureRtException.class)
+ public void testDoManager_Ex() throws Exception {
+
+ // generate exception
+ doThrow(new PoolingFeatureException()).when(mgr1).beforeStart();
+
+ pool.beforeStart(controller1);
+ }
+
+ @Test
+ public void testDoDeleteManager() {
+ assertFalse(pool.afterStop(controller1));
+ verify(mgr1).afterStop();
+
+ // ensure it has been removed from the map by re-invoking
+ assertFalse(pool.afterStop(controller1));
+
+ // count should be unchanged
+ verify(mgr1).afterStop();
+
+
+ // different controller
+ assertFalse(pool.afterStop(controller2));
+ verify(mgr2).afterStop();
+
+ // ensure it has been removed from the map by re-invoking
+ assertFalse(pool.afterStop(controller2));
+
+ // count should be unchanged
+ verify(mgr2).afterStop();
+
+
+ assertFalse(pool.afterStop(controllerDisabled));
+ }
+
+ @Test
+ public void testDoDeleteManager_NotFound() {
+ assertFalse(pool.afterStop(controllerDisabled));
+ }
+
+ @Test(expected = PoolingFeatureRtException.class)
+ public void testDoDeleteManager_Ex() {
+
+ // generate exception
+ doThrow(new PoolingFeatureRtException()).when(mgr1).afterStop();
+
+ pool.afterStop(controller1);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
new file mode 100644
index 00000000..01ee61ef
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -0,0 +1,1342 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.pooling.PoolingManagerImpl.Factory;
+import org.onap.policy.drools.pooling.extractor.ClassExtractors;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.state.ActiveState;
+import org.onap.policy.drools.pooling.state.IdleState;
+import org.onap.policy.drools.pooling.state.InactiveState;
+import org.onap.policy.drools.pooling.state.QueryState;
+import org.onap.policy.drools.pooling.state.StartState;
+import org.onap.policy.drools.pooling.state.State;
+import org.onap.policy.drools.system.PolicyController;
+
+public class PoolingManagerImplTest {
+
+ protected static final long STD_HEARTBEAT_WAIT_MS = 10;
+ protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
+ protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
+ protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
+ protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
+
+ private static final String HOST2 = "other.host";
+
+ private static final String MY_CONTROLLER = "my.controller";
+ private static final String MY_TOPIC = "my.topic";
+
+ private static final String TOPIC2 = "topic.two";
+
+ private static final String THE_EVENT = "the event";
+
+ private static final Object DECODED_EVENT = new Object();
+ private static final String REQUEST_ID = "my.request.id";
+
+ /**
+ * Number of dmaap.publish() invocations that should be issued when the
+ * manager is started.
+ */
+ private static final int START_PUB = 1;
+
+ /**
+ * Saved from PoolingManagerImpl and restored on exit from this test class.
+ */
+ private static Factory saveFactory;
+
+ /**
+ * Futures that have been allocated due to calls to scheduleXxx().
+ */
+ private Queue<ScheduledFuture<?>> futures;
+
+ private Properties plainProps;
+ private PoolingProperties poolProps;
+ private ListeningController controller;
+ private EventQueue eventQueue;
+ private ClassExtractors extractors;
+ private DmaapManager dmaap;
+ private ScheduledThreadPoolExecutor sched;
+ private DroolsController drools;
+ private Serializer ser;
+ private Factory factory;
+
+ private PoolingManagerImpl mgr;
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveFactory = PoolingManagerImpl.getFactory();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ PoolingManagerImpl.setFactory(saveFactory);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ plainProps = new Properties();
+
+ poolProps = mock(PoolingProperties.class);
+ when(poolProps.getSource()).thenReturn(plainProps);
+ when(poolProps.getPoolingTopic()).thenReturn(MY_TOPIC);
+ when(poolProps.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
+ when(poolProps.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
+ when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
+ when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
+ when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
+
+ futures = new LinkedList<>();
+ ser = new Serializer();
+
+ factory = mock(Factory.class);
+ eventQueue = mock(EventQueue.class);
+ extractors = mock(ClassExtractors.class);
+ dmaap = mock(DmaapManager.class);
+ controller = mock(ListeningController.class);
+ sched = mock(ScheduledThreadPoolExecutor.class);
+ drools = mock(DroolsController.class);
+
+ when(factory.makeEventQueue(any())).thenReturn(eventQueue);
+ when(factory.makeClassExtractors(any())).thenReturn(extractors);
+ when(factory.makeDmaapManager(any())).thenReturn(dmaap);
+ when(factory.makeScheduler()).thenReturn(sched);
+ when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true);
+ when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT);
+
+ when(extractors.extract(DECODED_EVENT)).thenReturn(REQUEST_ID);
+
+ when(controller.getName()).thenReturn(MY_CONTROLLER);
+ when(controller.getDrools()).thenReturn(drools);
+ when(controller.isAlive()).thenReturn(true);
+
+ when(sched.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))).thenAnswer(args -> {
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ futures.add(fut);
+
+ return fut;
+ });
+
+ when(sched.scheduleWithFixedDelay(any(Runnable.class), any(Long.class), any(Long.class), any(TimeUnit.class)))
+ .thenAnswer(args -> {
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ futures.add(fut);
+
+ return fut;
+ });
+
+ PoolingManagerImpl.setFactory(factory);
+
+ mgr = new PoolingManagerImpl(controller, poolProps);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ @Test
+ public void testPoolingManagerImpl() {
+ mgr = new PoolingManagerImpl(controller, poolProps);
+
+ State st = mgr.getCurrent();
+ assertTrue(st instanceof IdleState);
+
+ // ensure the state is attached to the manager
+ assertEquals(mgr.getHost(), st.getHost());
+ }
+
+ @Test
+ public void testPoolingManagerImpl_ClassEx() {
+ /*
+ * this controller does not implement TopicListener, which should cause
+ * a ClassCastException
+ */
+ PolicyController ctlr = mock(PolicyController.class);
+
+ PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class,
+ xxx -> new PoolingManagerImpl(ctlr, poolProps));
+ assertNotNull(ex.getCause());
+ assertTrue(ex.getCause() instanceof ClassCastException);
+ }
+
+ @Test
+ public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
+ // throw an exception when we try to create the dmaap manager
+ PoolingFeatureException ex = new PoolingFeatureException();
+ when(factory.makeDmaapManager(any())).thenThrow(ex);
+
+ PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
+ xxx -> new PoolingManagerImpl(controller, poolProps));
+ assertEquals(ex, ex2.getCause());
+ }
+
+ @Test
+ public void testGetHost() {
+ String host = mgr.getHost();
+ assertNotNull(host);
+
+ // create another manager and ensure it generates a different host
+ mgr = new PoolingManagerImpl(controller, poolProps);
+
+ assertNotNull(mgr.getHost());
+ assertFalse(host.equals(mgr.getHost()));
+ }
+
+ @Test
+ public void testGetTopic() {
+ assertEquals(MY_TOPIC, mgr.getTopic());
+ }
+
+ @Test
+ public void testGetProperties() {
+ assertEquals(poolProps, mgr.getProperties());
+ }
+
+ @Test
+ public void testBeforeStart() throws Exception {
+ // not running yet
+ mgr.beforeStart();
+
+ verify(dmaap).startPublisher();
+
+ verify(factory).makeScheduler();
+ verify(sched).setMaximumPoolSize(1);
+ verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
+
+ // try again - nothing should happen
+ mgr.beforeStart();
+
+ verify(dmaap).startPublisher();
+
+ verify(factory).makeScheduler();
+ verify(sched).setMaximumPoolSize(1);
+ verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ }
+
+ @Test
+ public void testBeforeStart_DmaapEx() throws Exception {
+ // generate an exception
+ PoolingFeatureException ex = new PoolingFeatureException();
+ doThrow(ex).when(dmaap).startPublisher();
+
+ PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, xxx -> mgr.beforeStart());
+ assertEquals(ex, ex2);
+
+ // should never start the scheduler
+ verify(factory, never()).makeScheduler();
+ }
+
+ @Test
+ public void testAfterStart() throws Exception {
+ startMgr();
+
+ verify(dmaap).startConsumer(mgr);
+
+ State st = mgr.getCurrent();
+ assertTrue(st instanceof StartState);
+
+ // ensure the state is attached to the manager
+ assertEquals(mgr.getHost(), st.getHost());
+
+ ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
+ verify(sched).schedule(any(Runnable.class), timeCap.capture(), unitCap.capture());
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, timeCap.getValue().longValue());
+ assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
+
+
+ // already started - nothing else happens
+ mgr.afterStart();
+
+ verify(dmaap).startConsumer(mgr);
+
+ assertTrue(mgr.getCurrent() instanceof StartState);
+
+ verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
+ }
+
+ @Test
+ public void testBeforeStop() throws Exception {
+ startMgr();
+
+ mgr.beforeStop();
+
+ verify(dmaap).stopConsumer(mgr);
+ verify(sched).shutdownNow();
+
+ assertTrue(mgr.getCurrent() instanceof IdleState);
+ }
+
+ @Test
+ public void testBeforeStop_NotRunning() throws Exception {
+ State st = mgr.getCurrent();
+
+ mgr.beforeStop();
+
+ verify(dmaap, never()).stopConsumer(any());
+ verify(sched, never()).shutdownNow();
+
+ // hasn't changed states either
+ assertEquals(st, mgr.getCurrent());
+ }
+
+ @Test
+ public void testBeforeStop_AfterPartialStart() throws Exception {
+ // call beforeStart but not afterStart
+ mgr.beforeStart();
+
+ State st = mgr.getCurrent();
+
+ mgr.beforeStop();
+
+ // should still shut the scheduler down
+ verify(sched).shutdownNow();
+
+ verify(dmaap, never()).stopConsumer(any());
+
+ // hasn't changed states
+ assertEquals(st, mgr.getCurrent());
+ }
+
+ @Test
+ public void testAfterStop() throws Exception {
+ startMgr();
+ mgr.beforeStop();
+
+ when(eventQueue.isEmpty()).thenReturn(false);
+ when(eventQueue.size()).thenReturn(3);
+
+ mgr.afterStop();
+
+ verify(eventQueue).clear();
+ verify(dmaap).stopPublisher();
+ }
+
+ @Test
+ public void testAfterStop_EmptyQueue() throws Exception {
+ startMgr();
+ mgr.beforeStop();
+
+ when(eventQueue.isEmpty()).thenReturn(true);
+ when(eventQueue.size()).thenReturn(0);
+
+ mgr.afterStop();
+
+ verify(eventQueue, never()).clear();
+ verify(dmaap).stopPublisher();
+ }
+
+ @Test
+ public void testBeforeLock() throws Exception {
+ startMgr();
+
+ mgr.beforeLock();
+
+ assertTrue(mgr.getCurrent() instanceof IdleState);
+ }
+
+ @Test
+ public void testAfterUnlock_AliveIdle() throws Exception {
+ // this really shouldn't happen
+
+ lockMgr();
+
+ mgr.afterUnlock();
+
+ // stays in idle state, because it has no scheduler
+ assertTrue(mgr.getCurrent() instanceof IdleState);
+ }
+
+ @Test
+ public void testAfterUnlock_AliveStarted() throws Exception {
+ startMgr();
+ lockMgr();
+
+ mgr.afterUnlock();
+
+ assertTrue(mgr.getCurrent() instanceof StartState);
+ }
+
+ @Test
+ public void testAfterUnlock_StoppedIdle() throws Exception {
+ startMgr();
+ lockMgr();
+
+ // controller is stopped
+ when(controller.isAlive()).thenReturn(false);
+
+ mgr.afterUnlock();
+
+ assertTrue(mgr.getCurrent() instanceof IdleState);
+ }
+
+ @Test
+ public void testAfterUnlock_StoppedStarted() throws Exception {
+ startMgr();
+
+ // Note: don't lockMgr()
+
+ // controller is stopped
+ when(controller.isAlive()).thenReturn(false);
+
+ mgr.afterUnlock();
+
+ assertTrue(mgr.getCurrent() instanceof StartState);
+ }
+
+ @Test
+ public void testChangeState() throws Exception {
+ // start should invoke changeState()
+ startMgr();
+
+ int ntimes = 0;
+
+ // should have set the filter for the StartState
+ verify(dmaap, times(++ntimes)).setFilter(any());
+
+ /*
+ * now go offline while it's locked
+ */
+ lockMgr();
+
+ // should have set the new filter
+ verify(dmaap, times(++ntimes)).setFilter(any());
+
+ // should have cancelled the timer
+ assertEquals(1, futures.size());
+ verify(futures.poll()).cancel(false);
+
+ /*
+ * now go back online
+ */
+ unlockMgr();
+
+ // should have set the new filter
+ verify(dmaap, times(++ntimes)).setFilter(any());
+
+ // timer should still be active
+ assertEquals(1, futures.size());
+ verify(futures.poll(), never()).cancel(false);
+ }
+
+ @Test
+ public void testSetFilter() throws Exception {
+ // start should cause a filter to be set
+ startMgr();
+
+ verify(dmaap).setFilter(any());
+ }
+
+ @Test
+ public void testSetFilter_DmaapEx() throws Exception {
+
+ // generate an exception
+ doThrow(new PoolingFeatureException()).when(dmaap).setFilter(any());
+
+ // start should invoke setFilter()
+ startMgr();
+
+ // no exception, means success
+ }
+
+ @Test
+ public void testInternalTopicFailed() throws Exception {
+ startMgr();
+
+ CountDownLatch latch = mgr.internalTopicFailed();
+
+ // wait for the thread to complete
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
+
+ verify(controller).stop();
+ }
+
+ @Test
+ public void testSchedule() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+ ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
+
+ verify(sched, times(2)).schedule(taskCap.capture(), timeCap.capture(), unitCap.capture());
+
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
+ assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
+
+ // execute it
+ taskCap.getValue().run();
+
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testScheduleWithFixedDelay() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+ ArgumentCaptor<Long> initCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
+
+ verify(sched).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
+ unitCap.capture());
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
+ assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
+
+ // execute it
+ taskCap.getValue().run();
+
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testPublishAdmin() throws Exception {
+ Offline msg = new Offline(mgr.getHost());
+ mgr.publishAdmin(msg);
+
+ assertEquals(Message.ADMIN, msg.getChannel());
+
+ verify(dmaap).publish(any());
+ }
+
+ @Test
+ public void testPublish() throws Exception {
+ Offline msg = new Offline(mgr.getHost());
+ mgr.publish("my.channel", msg);
+
+ assertEquals("my.channel", msg.getChannel());
+
+ verify(dmaap).publish(any());
+ }
+
+ @Test
+ public void testPublish_InvalidMsg() throws Exception {
+ // message is missing data
+ mgr.publish(Message.ADMIN, new Offline());
+
+ // should not have attempted to publish it
+ verify(dmaap, never()).publish(any());
+ }
+
+ @Test
+ public void testPublish_DmaapEx() throws Exception {
+
+ // generate exception
+ doThrow(new PoolingFeatureException()).when(dmaap).publish(any());
+
+ mgr.publish(Message.ADMIN, new Offline(mgr.getHost()));
+ }
+
+ @Test
+ public void testOnTopicEvent() throws Exception {
+ startMgr();
+
+ StartState st = (StartState) mgr.getCurrent();
+
+ /*
+ * give it its heart beat, that should cause it to transition to the
+ * Query state.
+ */
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+ hb.setChannel(Message.ADMIN);
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertTrue(mgr.getCurrent() instanceof QueryState);
+ }
+
+ @Test
+ public void testOnTopicEvent_NullEvent() throws Exception {
+ startMgr();
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null);
+ }
+
+ @Test
+ public void testBeforeOffer_Unlocked_NoIntercept() throws Exception {
+ startMgr();
+
+ assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ public void testBeforeOffer_Locked_NoIntercept() throws Exception {
+ startMgr();
+
+ lockMgr();
+
+ assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ public void testBeforeOffer_Locked_Intercept() throws Exception {
+ startMgr();
+ lockMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ CountDownLatch latch = catchRecursion(false);
+
+ Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+ mgr.handle(msg);
+
+ verify(dmaap, times(START_PUB)).publish(any());
+ verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+
+ // ensure we made it past both beforeXxx() methods
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testBeforeInsert_Intercept() throws Exception {
+ startMgr();
+ lockMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ CountDownLatch latch = catchRecursion(true);
+
+ Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+ mgr.handle(msg);
+
+ verify(dmaap, times(START_PUB)).publish(any());
+ verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+
+ // ensure we made it past both beforeXxx() methods
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testBeforeInsert_NoIntercept() throws Exception {
+ startMgr();
+
+ long tbegin = System.currentTimeMillis();
+
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
+ verify(eventQueue).add(msgCap.capture());
+
+ validateMessageContent(tbegin, msgCap.getValue());
+ }
+
+ @Test
+ public void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
+ startMgr();
+
+ when(extractors.extract(any())).thenReturn(null);
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ }
+
+ @Test
+ public void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
+ startMgr();
+
+ when(extractors.extract(any())).thenReturn("");
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ }
+
+ @Test
+ public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
+ startMgr();
+
+ assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ // should not have tried to enqueue a message
+ verify(eventQueue, never()).add(any());
+ }
+
+ @Test
+ public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
+ startMgr();
+
+ long tbegin = System.currentTimeMillis();
+
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
+ verify(eventQueue).add(msgCap.capture());
+
+ validateMessageContent(tbegin, msgCap.getValue());
+ }
+
+ @Test
+ public void testHandleExternalForward_NoAssignments() throws Exception {
+ startMgr();
+
+ long tbegin = System.currentTimeMillis();
+
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
+ verify(eventQueue).add(msgCap.capture());
+
+ validateMessageContent(tbegin, msgCap.getValue());
+ }
+
+ @Test
+ public void testHandleExternalForward() throws Exception {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ }
+
+ @Test
+ public void testHandleEvent_NullTarget() throws Exception {
+ startMgr();
+
+ // buckets have null targets
+ mgr.startDistributing(new BucketAssignments(new String[] {null, null}));
+
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ verify(dmaap, times(START_PUB)).publish(any());
+ }
+
+ @Test
+ public void testHandleEvent_SameHost() throws Exception {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ verify(dmaap, times(START_PUB)).publish(any());
+ }
+
+ @Test
+ public void testHandleEvent_DiffHost_TooManyHops() throws Exception {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(false));
+
+ Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+ msg.setNumHops(PoolingManagerImpl.MAX_HOPS + 1);
+ mgr.handle(msg);
+
+ // shouldn't publish
+ verify(dmaap, times(START_PUB)).publish(any());
+ verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ }
+
+ @Test
+ public void testHandleEvent_DiffHost_Forward() throws Exception {
+ startMgr();
+
+ // route the message to the *OTHER* host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ verify(dmaap, times(START_PUB + 1)).publish(any());
+ }
+
+ @Test
+ public void testExtractRequestId_NullEvent() throws Exception {
+ startMgr();
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, null));
+ }
+
+ @Test
+ public void testExtractRequestId_NullReqId() throws Exception {
+ startMgr();
+
+ when(extractors.extract(any())).thenReturn(null);
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ }
+
+ @Test
+ public void testExtractRequestId() throws Exception {
+ startMgr();
+
+ // route the message to the *OTHER* host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ }
+
+ @Test
+ public void testDecodeEvent_CannotDecode() throws Exception {
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(false);
+
+ assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ public void testDecodeEvent_UnsuppEx() throws Exception {
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ // generate exception
+ doThrow(new UnsupportedOperationException()).when(factory).decodeEvent(drools, TOPIC2, THE_EVENT);
+
+ assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ public void testDecodeEvent_ArgEx() throws Exception {
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ // generate exception
+ doThrow(new IllegalArgumentException()).when(factory).decodeEvent(drools, TOPIC2, THE_EVENT);
+
+ assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ public void testDecodeEvent_StateEx() throws Exception {
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ // generate exception
+ doThrow(new IllegalStateException()).when(factory).decodeEvent(drools, TOPIC2, THE_EVENT);
+
+ assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ public void testDecodeEvent() throws Exception {
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // route to another host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertTrue(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ public void testMakeForward() throws Exception {
+ startMgr();
+
+ long tbegin = System.currentTimeMillis();
+
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
+ verify(eventQueue).add(msgCap.capture());
+
+ validateMessageContent(tbegin, msgCap.getValue());
+ }
+
+ @Test
+ public void testMakeForward_InvalidMsg() throws Exception {
+ startMgr();
+
+ assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ // should not have tried to enqueue a message
+ verify(eventQueue, never()).add(any());
+ }
+
+ @Test
+ public void testHandle_SameHost() throws Exception {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+ mgr.handle(msg);
+
+ verify(dmaap, times(START_PUB)).publish(any());
+ verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ }
+
+ @Test
+ public void testHandle_DiffHost() throws Exception {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(false));
+
+ Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+ mgr.handle(msg);
+
+ verify(dmaap, times(START_PUB + 1)).publish(any());
+ verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ }
+
+ @Test
+ public void testInject() throws Exception {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ CountDownLatch latch = catchRecursion(true);
+
+ Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+ mgr.handle(msg);
+
+ verify(dmaap, times(START_PUB)).publish(any());
+ verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+
+ // ensure we made it past both beforeXxx() methods
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testHandleInternal() throws Exception {
+ startMgr();
+
+ StartState st = (StartState) mgr.getCurrent();
+
+ /*
+ * give it its heart beat, that should cause it to transition to the
+ * Query state.
+ */
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+ hb.setChannel(Message.ADMIN);
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertTrue(mgr.getCurrent() instanceof QueryState);
+ }
+
+ @Test
+ public void testHandleInternal_IOEx() throws Exception {
+ startMgr();
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
+
+ assertTrue(mgr.getCurrent() instanceof StartState);
+ }
+
+ @Test
+ public void testHandleInternal_PoolEx() throws Exception {
+ startMgr();
+
+ StartState st = (StartState) mgr.getCurrent();
+
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+
+ /*
+ * do NOT set the channel - this will cause the message to be invalid,
+ * triggering an exception
+ */
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertTrue(mgr.getCurrent() instanceof StartState);
+ }
+
+ @Test
+ public void testStartDistributing() throws Exception {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+
+ // null assignments should be ignored
+ mgr.startDistributing(null);
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+
+ // route the message to the other host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ }
+
+ @Test
+ public void testStartDistributing_EventsInQueue_ProcessLocally() throws Exception {
+ startMgr();
+
+ // put items in the queue
+ LinkedList<Forward> lst = new LinkedList<>();
+ lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
+ lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
+ lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
+
+ when(eventQueue.poll()).thenAnswer(args -> lst.poll());
+
+ // route the messages to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ // all of the events should have been processed locally
+ verify(dmaap, times(START_PUB)).publish(any());
+ verify(controller, times(3)).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ }
+
+ @Test
+ public void testStartDistributing_EventsInQueue_Forward() throws Exception {
+ startMgr();
+
+ // put items in the queue
+ LinkedList<Forward> lst = new LinkedList<>();
+ lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
+ lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
+ lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
+
+ when(eventQueue.poll()).thenAnswer(args -> lst.poll());
+
+ // route the messages to the OTHER host
+ mgr.startDistributing(makeAssignments(false));
+
+ // all of the events should have been forwarded
+ verify(dmaap, times(4)).publish(any());
+ verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ }
+
+ @Test
+ public void testGoStart() {
+ State st = mgr.goStart();
+ assertTrue(st instanceof StartState);
+ assertEquals(mgr.getHost(), st.getHost());
+ }
+
+ @Test
+ public void testGoQuery() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
+ mgr.startDistributing(asgn);
+
+ State st = mgr.goQuery();
+
+ assertTrue(st instanceof QueryState);
+ assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(asgn, mgr.getAssignments());
+ }
+
+ @Test
+ public void testGoActive() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
+ mgr.startDistributing(asgn);
+
+ State st = mgr.goActive();
+
+ assertTrue(st instanceof ActiveState);
+ assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(asgn, mgr.getAssignments());
+ }
+
+ @Test
+ public void testGoInactive() {
+ State st = mgr.goInactive();
+ assertTrue(st instanceof InactiveState);
+ assertEquals(mgr.getHost(), st.getHost());
+ }
+
+ @Test
+ public void testTimerActionRun() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+
+ verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
+
+ // execute it
+ taskCap.getValue().run();
+
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testTimerActionRun_DiffState() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+
+ verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
+
+ // give it a heartbeat so that it transitions to the query state
+ StartState st = (StartState) mgr.getCurrent();
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+ hb.setChannel(Message.ADMIN);
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertTrue(mgr.getCurrent() instanceof QueryState);
+
+ // execute it
+ taskCap.getValue().run();
+
+ // it should NOT have counted down
+ assertEquals(1, latch.getCount());
+ }
+
+ /**
+ * Validates the message content.
+ *
+ * @param tbegin creation time stamp must be no less than this
+ * @param msg message to be validated
+ */
+ private void validateMessageContent(long tbegin, Forward msg) {
+ assertEquals(0, msg.getNumHops());
+ assertTrue(msg.getCreateTimeMs() >= tbegin);
+ assertEquals(mgr.getHost(), msg.getSource());
+ assertEquals(CommInfrastructure.UEB, msg.getProtocol());
+ assertEquals(TOPIC2, msg.getTopic());
+ assertEquals(THE_EVENT, msg.getPayload());
+ assertEquals(REQUEST_ID, msg.getRequestId());
+ }
+
+ /**
+ * Configure the mock controller to act like a real controller, invoking
+ * beforeOffer and then beforeInsert, so we can make sure they pass through.
+ * We'll keep count to ensure we don't get into infinite recursion.
+ *
+ * @param invokeBeforeInsert {@code true} if beforeInsert() should be
+ * invoked, {@code false} if it should be skipped
+ *
+ * @return a latch that will be counted down if both beforeXxx() methods
+ * return false
+ */
+ private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
+ CountDownLatch recursion = new CountDownLatch(3);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ doAnswer(args -> {
+
+ recursion.countDown();
+ if (recursion.getCount() == 0) {
+ fail("recursive calls to onTopicEvent");
+ }
+
+ int iarg = 0;
+ CommInfrastructure proto = args.getArgumentAt(iarg++, CommInfrastructure.class);
+ String topic = args.getArgumentAt(iarg++, String.class);
+ String event = args.getArgumentAt(iarg++, String.class);
+
+ if (mgr.beforeOffer(proto, topic, event)) {
+ return null;
+ }
+
+ if (invokeBeforeInsert && mgr.beforeInsert(proto, topic, event, DECODED_EVENT)) {
+ return null;
+ }
+
+ latch.countDown();
+
+ return null;
+ }).when(controller).onTopicEvent(any(), any(), any());
+
+ return latch;
+ }
+
+ /**
+ * Makes an assignment with two buckets.
+ *
+ * @param sameHost {@code true} if the {@link #REQUEST_ID} should has to the
+ * manager's bucket, {@code false} if it should hash to the other
+ * host's bucket
+ * @return a new bucket assignment
+ */
+ private BucketAssignments makeAssignments(boolean sameHost) {
+ int slot = REQUEST_ID.hashCode() % 2;
+
+ // slot numbers are 0 and 1 - reverse them if it's for a different host
+ if (!sameHost) {
+ slot = 1 - slot;
+ }
+
+ String[] asgn = new String[2];
+ asgn[slot] = mgr.getHost();
+ asgn[1 - slot] = HOST2;
+
+ return new BucketAssignments(asgn);
+ }
+
+ /**
+ * Invokes methods necessary to start the manager.
+ *
+ * @throws PoolingFeatureException if an error occurs
+ */
+ private void startMgr() throws PoolingFeatureException {
+ mgr.beforeStart();
+ mgr.afterStart();
+ }
+
+ /**
+ * Invokes methods necessary to lock the manager.
+ */
+ private void lockMgr() {
+ mgr.beforeLock();
+ }
+
+ /**
+ * Invokes methods necessary to unlock the manager.
+ */
+ private void unlockMgr() {
+ mgr.afterUnlock();
+ }
+
+ /**
+ * Used to create a mock object that implements both super interfaces.
+ */
+ private static interface ListeningController extends TopicListener, PolicyController {
+
+ }
+
+ /**
+ * Invokes a method that is expected to throw an exception.
+ *
+ * @param exClass class of exception that is expected
+ * @param func function to invoke
+ * @return the exception that was thrown
+ * @throws AssertionError if no exception was thrown
+ */
+ private <T extends Exception> T expectException(Class<T> exClass, ExFunction<T> func) {
+ try {
+ func.apply(null);
+ throw new AssertionError("missing exception");
+
+ } catch (Exception e) {
+ return exClass.cast(e);
+ }
+ }
+
+ /**
+ * Function that is expected to throw an exception.
+ *
+ * @param <T> type of exception the function is expected to throw
+ */
+ @FunctionalInterface
+ private static interface ExFunction<T extends Exception> {
+
+ /**
+ * Invokes the function.
+ *
+ * @param arg always {@code null}
+ * @throws T if an error occurs
+ */
+ public void apply(Void arg) throws T;
+
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
new file mode 100644
index 00000000..63eb59d4
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
@@ -0,0 +1,178 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.generalize;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
+import static org.onap.policy.drools.pooling.PoolingProperties.ACTIVE_HEARTBEAT_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.FEATURE_ENABLED;
+import static org.onap.policy.drools.pooling.PoolingProperties.IDENTIFICATION_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.INTER_HEARTBEAT_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_AGE_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_LIMIT;
+import static org.onap.policy.drools.pooling.PoolingProperties.POOLING_TOPIC;
+import static org.onap.policy.drools.pooling.PoolingProperties.REACTIVATE_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.START_HEARTBEAT_MS;
+import java.util.Properties;
+import java.util.function.Function;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+public class PoolingPropertiesTest {
+
+ private static final String CONTROLLER = "a.controller";
+
+ private static final String STD_POOLING_TOPIC = "my.topic";
+ public static final boolean STD_FEATURE_ENABLED = true;
+ public static final int STD_OFFLINE_LIMIT = 10;
+ public static final long STD_OFFLINE_AGE_MS = 1000L;
+ public static final long STD_START_HEARTBEAT_MS = 2000L;
+ public static final long STD_REACTIVATE_MS = 3000L;
+ public static final long STD_IDENTIFICATION_MS = 4000L;
+ public static final long STD_LEADER_MS = 5000L;
+ public static final long STD_ACTIVE_HEARTBEAT_MS = 6000L;
+ public static final long STD_INTER_HEARTBEAT_MS = 7000L;
+
+ private Properties plain;
+ private PoolingProperties pooling;
+
+ @Before
+ public void setUp() throws Exception {
+ plain = makeProperties();
+
+ pooling = new PoolingProperties(CONTROLLER, plain);
+ }
+
+ @Test
+ public void testPoolingProperties() throws PropertyException {
+ // ensure no exceptions
+ new PoolingProperties(CONTROLLER, plain);
+ }
+
+ @Test
+ public void testGetSource() {
+ assertEquals(plain, pooling.getSource());
+ }
+
+ @Test
+ public void testGetPoolingTopic() {
+ assertEquals(STD_POOLING_TOPIC, pooling.getPoolingTopic());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetPoolingTopic_Generalize() {
+ // shouldn't be able to generalize the topic
+ generalize(POOLING_TOPIC);
+ }
+
+ @Test
+ public void testGetOfflineLimit() throws PropertyException {
+ doTest(OFFLINE_LIMIT, STD_OFFLINE_LIMIT, 1000, xxx -> pooling.getOfflineLimit());
+ }
+
+ @Test
+ public void testGetOfflineAgeMs() throws PropertyException {
+ doTest(OFFLINE_AGE_MS, STD_OFFLINE_AGE_MS, 60000L, xxx -> pooling.getOfflineAgeMs());
+ }
+
+ @Test
+ public void testGetStartHeartbeatMs() throws PropertyException {
+ doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 50000L, xxx -> pooling.getStartHeartbeatMs());
+ }
+
+ @Test
+ public void testGetReactivateMs() throws PropertyException {
+ doTest(REACTIVATE_MS, STD_REACTIVATE_MS, 50000L, xxx -> pooling.getReactivateMs());
+ }
+
+ @Test
+ public void testGetIdentificationMs() throws PropertyException {
+ doTest(IDENTIFICATION_MS, STD_IDENTIFICATION_MS, 50000L, xxx -> pooling.getIdentificationMs());
+ }
+
+ @Test
+ public void testGetActiveHeartbeatMs() throws PropertyException {
+ doTest(ACTIVE_HEARTBEAT_MS, STD_ACTIVE_HEARTBEAT_MS, 50000L, xxx -> pooling.getActiveHeartbeatMs());
+ }
+
+ @Test
+ public void testGetInterHeartbeatMs() throws PropertyException {
+ doTest(INTER_HEARTBEAT_MS, STD_INTER_HEARTBEAT_MS, 15000L, xxx -> pooling.getInterHeartbeatMs());
+ }
+
+ /**
+ * Tests a particular property. Verifies that the correct value is returned
+ * if the specialized property has a value or the property has no value.
+ * Also verifies that the property name can be generalized.
+ *
+ * @param propnm name of the property of interest
+ * @param specValue expected specialized value
+ * @param dfltValue expected default value
+ * @param func function to get the field
+ * @throws PropertyException if an error occurs
+ */
+ private <T> void doTest(String propnm, T specValue, T dfltValue, Function<Void, T> func) throws PropertyException {
+ /*
+ * With specialized property
+ */
+ pooling = new PoolingProperties(CONTROLLER, plain);
+ assertEquals("special " + propnm, specValue, func.apply(null));
+
+ /*
+ * Ensure the property supports generalization - this will throw an
+ * exception if it does not.
+ */
+ assertFalse(propnm.equals(generalize(propnm)));
+
+ /*
+ * Without the property - should use the default value.
+ */
+ plain.remove(specialize(propnm, CONTROLLER));
+ plain.remove(generalize(propnm));
+ pooling = new PoolingProperties(CONTROLLER, plain);
+ assertEquals("default " + propnm, dfltValue, func.apply(null));
+ }
+
+ /**
+ * Makes a set of properties, where all of the properties are specialized
+ * for the controller.
+ *
+ * @return a new property set
+ */
+ private Properties makeProperties() {
+ Properties props = new Properties();
+
+ props.setProperty(specialize(POOLING_TOPIC, CONTROLLER), STD_POOLING_TOPIC);
+ props.setProperty(specialize(FEATURE_ENABLED, CONTROLLER), "" + STD_FEATURE_ENABLED);
+ props.setProperty(specialize(OFFLINE_LIMIT, CONTROLLER), "" + STD_OFFLINE_LIMIT);
+ props.setProperty(specialize(OFFLINE_AGE_MS, CONTROLLER), "" + STD_OFFLINE_AGE_MS);
+ props.setProperty(specialize(START_HEARTBEAT_MS, CONTROLLER), "" + STD_START_HEARTBEAT_MS);
+ props.setProperty(specialize(REACTIVATE_MS, CONTROLLER), "" + STD_REACTIVATE_MS);
+ props.setProperty(specialize(IDENTIFICATION_MS, CONTROLLER), "" + STD_IDENTIFICATION_MS);
+ props.setProperty(specialize(ACTIVE_HEARTBEAT_MS, CONTROLLER), "" + STD_ACTIVE_HEARTBEAT_MS);
+ props.setProperty(specialize(INTER_HEARTBEAT_MS, CONTROLLER), "" + STD_INTER_HEARTBEAT_MS);
+
+ return props;
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
new file mode 100644
index 00000000..4206a836
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
@@ -0,0 +1,96 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
+import java.util.Map;
+import java.util.TreeMap;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class SerializerTest {
+
+ @Test
+ public void testSerializer() {
+ new Serializer();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testEncodeFilter() throws Exception {
+ Serializer ser = new Serializer();
+
+ /*
+ * Ensure raw maps serialize as expected. Use a TreeMap so the field
+ * order is predictable.
+ */
+ Map<String, Object> top = new TreeMap<>();
+ Map<String, Object> inner = new TreeMap<>();
+ top.put("abc", 20);
+ top.put("def", inner);
+ top.put("ghi", true);
+ inner.put("xyz", 30);
+ assertEquals("{'abc':20,'def':{'xyz':30},'ghi':true}".replace('\'', '"'), ser.encodeFilter(top));
+
+ /*
+ * Ensure we can encode a complicated filter without throwing an
+ * exception
+ */
+ Map<String, Object> complexFilter = makeAnd(makeEquals("fieldC", "valueC"),
+ makeOr(makeEquals("fieldA", "valueA"), makeEquals("fieldB", "valueB")));
+ String val = ser.encodeFilter(complexFilter);
+ assertFalse(val.isEmpty());
+ }
+
+ @Test
+ public void testEncodeMsg_testDecodeMsg() throws Exception {
+ Serializer ser = new Serializer();
+
+ Query msg = new Query("hostA");
+ msg.setChannel("channelB");
+
+ String encoded = ser.encodeMsg(msg);
+ assertNotNull(encoded);
+
+ Message decoded = ser.decodeMsg(encoded);
+ assertEquals(Query.class, decoded.getClass());
+
+ assertEquals(msg.getSource(), decoded.getSource());
+ assertEquals(msg.getChannel(), decoded.getChannel());
+
+ // should work a second time, too
+ encoded = ser.encodeMsg(msg);
+ assertNotNull(encoded);
+
+ decoded = ser.decodeMsg(encoded);
+ assertEquals(Query.class, decoded.getClass());
+
+ assertEquals(msg.getSource(), decoded.getSource());
+ assertEquals(msg.getChannel(), decoded.getChannel());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
new file mode 100644
index 00000000..8b495099
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
@@ -0,0 +1,186 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import java.util.Properties;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SpecPropertiesTest {
+
+ /**
+ * Property prefix of interest.
+ */
+ private static final String MY_PREFIX = "my.prefix";
+
+ /**
+ * Specialization, which follows the prefix.
+ */
+ private static final String MY_SPEC = "my.spec";
+
+ /**
+ * Generalized prefix (i.e., without the spec).
+ */
+ private static final String PREFIX_GEN = MY_PREFIX + ".";
+
+ /**
+ * Specialized prefix (i.e., with the spec).
+ */
+ private static final String PREFIX_SPEC = PREFIX_GEN + MY_SPEC + ".";
+
+ /**
+ * Suffix to add to property names to generate names of properties that are
+ * not populated.
+ */
+ private static final String SUFFIX = ".suffix";
+
+ /**
+ * Property name without a prefix.
+ */
+ private static final String PROP_NO_PREFIX = "other";
+
+ /**
+ * Generalized property name (i.e., without the spec).
+ */
+ private static final String PROP_GEN = PREFIX_GEN + "generalized";
+
+ // property names that include the spec
+ private static final String PROP_SPEC = PREFIX_SPEC + "specialized";
+ private static final String PROP_UNKNOWN = PREFIX_SPEC + "unknown";
+
+ // property values
+ private static final String VAL_NO_PREFIX = "no-prefix";
+ private static final String VAL_GEN = "gen";
+ private static final String VAL_SPEC = "spec";
+
+ private static final String VAL_DEFAULT = "default value";
+
+ private Properties supportingProps;
+ private SpecProperties props;
+
+ @Before
+ public void setUp() {
+ supportingProps = new Properties();
+
+ supportingProps.setProperty(PROP_NO_PREFIX, VAL_NO_PREFIX);
+ supportingProps.setProperty(PROP_GEN, VAL_GEN);
+ supportingProps.setProperty(PROP_SPEC, VAL_SPEC);
+
+ props = new SpecProperties(MY_PREFIX, MY_SPEC);
+
+ props.putAll(supportingProps);
+ }
+
+ @Test
+ public void testSpecPropertiesStringString() {
+
+ // no supporting properties
+ props = new SpecProperties(MY_PREFIX, MY_SPEC);
+
+ assertEquals(PREFIX_GEN, props.getPrefix());
+ assertEquals(PREFIX_SPEC, props.getSpecPrefix());
+
+ // everything is null
+ assertNull(props.getProperty(gen(PROP_NO_PREFIX)));
+ assertNull(props.getProperty(gen(PROP_GEN)));
+ assertNull(props.getProperty(gen(PROP_SPEC)));
+ assertNull(props.getProperty(gen(PROP_UNKNOWN)));
+ }
+
+ @Test
+ public void testSpecPropertiesStringStringProperties() {
+
+ // use supportingProps as default properties
+ props = new SpecProperties(MY_PREFIX, MY_SPEC, supportingProps);
+
+ assertEquals(PREFIX_GEN, props.getPrefix());
+ assertEquals(PREFIX_SPEC, props.getSpecPrefix());
+
+ assertEquals(VAL_NO_PREFIX, props.getProperty(gen(PROP_NO_PREFIX)));
+ assertEquals(VAL_GEN, props.getProperty(gen(PROP_GEN)));
+ assertEquals(VAL_SPEC, props.getProperty(gen(PROP_SPEC)));
+ assertNull(props.getProperty(gen(PROP_UNKNOWN)));
+ }
+
+ @Test
+ public void testWithTrailingDot() {
+ // neither has trailing dot
+ assertEquals(PREFIX_GEN, props.getPrefix());
+ assertEquals(PREFIX_SPEC, props.getSpecPrefix());
+
+ // both have trailing dot
+ props = new SpecProperties(PREFIX_GEN, MY_SPEC + ".");
+ assertEquals(PREFIX_GEN, props.getPrefix());
+ assertEquals(PREFIX_SPEC, props.getSpecPrefix());
+ }
+
+ @Test
+ public void testGetPropertyString() {
+ // the key does contain the prefix
+ assertEquals(VAL_NO_PREFIX, props.getProperty(gen(PROP_NO_PREFIX)));
+ assertNull(props.getProperty(gen(PROP_NO_PREFIX + SUFFIX)));
+
+ // specialized value exists
+ assertEquals(VAL_GEN, props.getProperty(gen(PROP_GEN)));
+ assertNull(props.getProperty(gen(PROP_GEN + SUFFIX)));
+
+ // generalized value exists
+ assertEquals(VAL_SPEC, props.getProperty(gen(PROP_SPEC)));
+ assertNull(props.getProperty(gen(PROP_SPEC + SUFFIX)));
+
+ // not found
+ assertNull(props.getProperty(gen(PROP_UNKNOWN)));
+ assertNull(props.getProperty(gen(PROP_UNKNOWN + SUFFIX)));
+ }
+
+ @Test
+ public void testGetPropertyStringString() {
+ // the key does contain the prefix
+ assertEquals(VAL_NO_PREFIX, props.getProperty(gen(PROP_NO_PREFIX), VAL_DEFAULT));
+ assertEquals(VAL_DEFAULT, props.getProperty(gen(PROP_NO_PREFIX + SUFFIX), VAL_DEFAULT));
+
+ // specialized value exists
+ assertEquals(VAL_GEN, props.getProperty(gen(PROP_GEN), VAL_DEFAULT));
+ assertEquals(VAL_DEFAULT, props.getProperty(gen(PROP_GEN + SUFFIX), VAL_DEFAULT));
+
+ // generalized value exists
+ assertEquals(VAL_SPEC, props.getProperty(gen(PROP_SPEC), VAL_DEFAULT));
+ assertEquals(VAL_DEFAULT, props.getProperty(gen(PROP_SPEC + SUFFIX), VAL_DEFAULT));
+
+ // not found
+ assertEquals(VAL_DEFAULT, props.getProperty(gen(PROP_UNKNOWN), VAL_DEFAULT));
+ assertEquals(VAL_DEFAULT, props.getProperty(gen(PROP_UNKNOWN + SUFFIX), VAL_DEFAULT));
+
+ // can return null
+ assertNull(props.getProperty(gen(PROP_UNKNOWN), null));
+ }
+
+ private String gen(String propnm) {
+ if (propnm.startsWith(PREFIX_SPEC)) {
+ return PREFIX_GEN + propnm.substring(PREFIX_SPEC.length());
+ }
+
+ return propnm;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java
new file mode 100644
index 00000000..e9246430
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java
@@ -0,0 +1,440 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.function.Function;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClassExtractorsTest {
+
+ private static final int NTIMES = 5;
+
+ private static final String MY_TYPE = "theType";
+ private static final String PROP_PREFIX = "extractor." + MY_TYPE + ".";
+
+ private static final String VALUE = "a value";
+ private static final Integer INT_VALUE = 10;
+ private static final Integer INT_VALUE2 = 20;
+
+ private Properties props;
+ private ClassExtractors map;
+
+ @Before
+ public void setUp() {
+ props = new Properties();
+
+ props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
+ props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
+
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+ }
+
+ @Test
+ public void testExtract() {
+ Simple obj = new Simple();
+ assertEquals(INT_VALUE, map.extract(obj));
+
+ // string value
+ assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
+
+ // null object
+ assertNull(map.extract(null));
+
+ // values from two different kinds of objects
+ props = new Properties();
+ props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
+ props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+
+ assertEquals(INT_VALUE, map.extract(new Simple()));
+ assertEquals(VALUE, map.extract(new Sub()));
+
+ // values from a superclass method, but property defined for subclass
+ props = new Properties();
+ props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+
+ assertEquals(VALUE, map.extract(new Sub()));
+
+ // values from a superclass field, but property defined for subclass
+ props = new Properties();
+ props.setProperty(PROP_PREFIX + Sub.class.getName(), "${intValue}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+
+ assertEquals(INT_VALUE, map.extract(new Sub()));
+
+
+ // prefix includes trailing "."
+ props = new Properties();
+ props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
+ map = new ClassExtractors(props, PROP_PREFIX.substring(0, PROP_PREFIX.length() - 1), MY_TYPE);
+ assertEquals(INT_VALUE, map.extract(new Simple()));
+
+
+ // values from an class in a different file
+ props = new Properties();
+ props.setProperty(PROP_PREFIX + ClassExtractorsTestSupport.class.getName(), "${nested.theValue}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+
+ assertEquals(ClassExtractorsTestSupport2.NESTED_VALUE, map.extract(new ClassExtractorsTestSupport()));
+ }
+
+ @Test
+ public void testGetExtractor() {
+ Simple obj = new Simple();
+
+ // repeat - shouldn't re-create the extractor
+ for (int x = 0; x < NTIMES; ++x) {
+ assertEquals("x=" + x, INT_VALUE, map.extract(obj));
+ assertEquals("x=" + x, 1, map.size());
+ }
+ }
+
+ @Test
+ public void testBuildExtractorClass_TopLevel() {
+ // extractor defined for top-level class
+ props = new Properties();
+ props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
+
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+ assertEquals(VALUE, map.extract(new Sub()));
+
+ // one extractor for top-level class
+ assertEquals(1, map.size());
+ }
+
+ @Test
+ public void testBuildExtractorClass_SuperClass() {
+ // extractor defined for superclass (interface)
+ assertEquals(VALUE, map.extract(new Sub()));
+
+ // one extractor for top-level class and one for interface
+ assertEquals(2, map.size());
+ }
+
+ @Test
+ public void testBuildExtractorClass_NotDefined() {
+ // no extractor defined for "this" class
+ assertNull(map.extract(this));
+
+ // one NULL extractor for top-level class
+ assertEquals(1, map.size());
+ }
+
+ @Test
+ public void testBuildExtractorClassString() {
+ // no leading "${"
+ assertNull(tryIt(Simple.class, "intValue}", xxx -> new Simple()));
+
+ // no trailing "}"
+ assertNull(tryIt(Simple.class, "${intValue", xxx -> new Simple()));
+
+ // leading "."
+ assertNull(tryIt(Sub.class, "${.simple.strValue}", xxx -> new Sub()));
+
+ // trailing "."
+ assertNull(tryIt(Sub.class, "${simple.strValue.}", xxx -> new Sub()));
+
+ // one component
+ assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
+
+ // two components
+ assertEquals(VALUE, tryIt(Sub.class, "${simple.strValue}", xxx -> new Sub()));
+
+ // invalid component
+ assertNull(tryIt(Sub.class, "${unknown}", xxx -> new Sub()));
+ }
+
+ @Test
+ public void testGetClassExtractor_InSuper() {
+ // field in the superclass
+ assertEquals(INT_VALUE, tryIt(Super.class, "${intValue}", xxx -> new Sub()));
+ }
+
+ @Test
+ public void testGetClassExtractor_InInterface() {
+ // defined in the interface
+ assertEquals(VALUE, map.extract(new Sub()));
+ }
+
+ @Test
+ public void testNullExtractorExtract() {
+ // empty properties - should only create NullExtractor
+ map = new ClassExtractors(new Properties(), PROP_PREFIX, MY_TYPE);
+
+ Simple obj = new Simple();
+
+ // repeat - shouldn't re-create the extractor
+ for (int x = 0; x < NTIMES; ++x) {
+ assertNull("x=" + x, map.extract(obj));
+ assertEquals("x=" + x, 1, map.size());
+ }
+ }
+
+ @Test
+ public void testComponetizedExtractor() {
+ // one component
+ assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
+
+ // three components
+ assertEquals(VALUE, tryIt(Sub.class, "${cont.data.strValue}", xxx -> new Sub()));
+ }
+
+ @Test
+ public void testComponetizedExtractorBuildExtractor_Method() {
+ assertEquals(INT_VALUE, tryIt(Simple.class, "${intValue}", xxx -> new Simple()));
+ }
+
+ @Test
+ public void testComponetizedExtractorBuildExtractor_Field() {
+ assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
+ }
+
+ @Test
+ public void testComponetizedExtractorBuildExtractor_Map() {
+ Map<String, Object> inner = new TreeMap<>();
+ inner.put("inner1", "abc1");
+ inner.put("inner2", "abc2");
+
+ Map<String, Object> outer = new TreeMap<>();
+ outer.put("outer1", "def1");
+ outer.put("outer2", inner);
+
+ Simple obj = new Simple();
+
+ props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+ assertEquals(null, map.extract(obj));
+
+ obj.mapValue = outer;
+ props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+ assertEquals("abc2", map.extract(obj));
+ }
+
+ @Test
+ public void testComponetizedExtractorBuildExtractor_Unknown() {
+ assertNull(tryIt(Simple.class, "${unknown2}", xxx -> new Simple()));
+ }
+
+ @Test
+ public void testComponetizedExtractorExtract_MiddleNull() {
+ // data component is null
+ assertEquals(null, tryIt(Sub.class, "${cont.data.strValue}", xxx -> {
+ Sub obj = new Sub();
+ obj.cont.simpleValue = null;
+ return obj;
+ }));
+ }
+
+ @Test
+ public void testComponetizedExtractorGetMethodExtractor_VoidMethod() {
+ // tell it to use getVoidValue()
+ props.setProperty(PROP_PREFIX + Simple.class.getName(), "${voidValue}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+
+ Simple obj = new Simple();
+ assertNull(map.extract(obj));
+
+ assertFalse(obj.voidInvoked);
+ }
+
+ @Test
+ public void testComponetizedExtractorGetMethodExtractor() {
+ assertEquals(INT_VALUE, map.extract(new Simple()));
+ }
+
+ @Test
+ public void testComponetizedExtractorGetFieldExtractor() {
+ // use a field
+ assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
+ }
+
+ @Test
+ public void testComponetizedExtractorGetMapExtractor() {
+ Map<String, Object> inner = new TreeMap<>();
+ inner.put("inner1", "abc1");
+ inner.put("inner2", "abc2");
+
+ Map<String, Object> outer = new TreeMap<>();
+ outer.put("outer1", "def1");
+ outer.put("outer2", inner);
+
+ Simple obj = new Simple();
+
+ obj.mapValue = outer;
+ props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+ assertEquals("abc2", map.extract(obj));
+ }
+
+ @Test
+ public void testComponetizedExtractorGetMapExtractor_MapSubclass() {
+ Map<String, Object> inner = new TreeMap<>();
+ inner.put("inner1", "abc1");
+ inner.put("inner2", "abc2");
+
+ MapSubclass outer = new MapSubclass();
+ outer.put("outer1", "def1");
+ outer.put("outer2", inner);
+
+ Simple obj = new Simple();
+
+ props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+ assertEquals(null, map.extract(obj));
+
+ obj.mapValue = outer;
+ props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+ assertEquals("abc2", map.extract(obj));
+ }
+
+ /**
+ * Sets a property for the given class, makes an object, and then returns
+ * the value extracted.
+ *
+ * @param clazz class whose property is to be set
+ * @param propval value to which to set the property
+ * @param makeObj function to create the object whose data is to be
+ * extracted
+ * @return the extracted data, or {@code null} if nothing was extracted
+ */
+ private Object tryIt(Class<?> clazz, String propval, Function<Void, Object> makeObj) {
+ Properties props = new Properties();
+ props.setProperty(PROP_PREFIX + clazz.getName(), propval);
+
+ map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
+
+ return map.extract(makeObj.apply(null));
+ }
+
+ /**
+ * A Map subclass, used to verify that getMapExtractor() still handles it.
+ */
+ private static class MapSubclass extends TreeMap<String, Object> {
+ private static final long serialVersionUID = 1L;
+
+ }
+
+ /**
+ * A simple class.
+ */
+ private static class Simple {
+
+ /**
+ * This will not be used because getIntValue() will override it.
+ */
+ @SuppressWarnings("unused")
+ private int intValue = INT_VALUE2;
+
+ /**
+ * Used to verify retrieval via a field name.
+ */
+ @SuppressWarnings("unused")
+ private String strValue = VALUE;
+
+ /**
+ * Used to verify retrieval within maps.
+ */
+ @SuppressWarnings("unused")
+ private Map<String, Object> mapValue = null;
+
+ /**
+ * {@code True} if {@link #getVoidValue()} was invoked, {@code false}
+ * otherwise.
+ */
+ private boolean voidInvoked = false;
+
+ /**
+ * This function will supercede the value in the "intValue" field.
+ *
+ * @return INT_VALUE
+ */
+ @SuppressWarnings("unused")
+ public Integer getIntValue() {
+ return INT_VALUE;
+ }
+
+ /**
+ * Used to verify that void functions are not invoked.
+ */
+ @SuppressWarnings("unused")
+ public void getVoidValue() {
+ voidInvoked = true;
+ }
+ }
+
+ /**
+ * Used to verify multi-component retrieval.
+ */
+ private static class Container {
+ private Simple simpleValue = new Simple();
+
+ @SuppressWarnings("unused")
+ public Simple getData() {
+ return simpleValue;
+ }
+ }
+
+ /**
+ * Used to verify extraction when the property refers to an interface.
+ */
+ private static interface WithString {
+
+ public String getStrValue();
+ }
+
+ /**
+ * Used to verify retrieval within a superclass.
+ */
+ private static class Super implements WithString {
+
+ @SuppressWarnings("unused")
+ private int intValue = INT_VALUE;
+
+ @Override
+ public String getStrValue() {
+ return VALUE;
+ }
+ }
+
+ /**
+ * Used to verify retrieval within a subclass.
+ */
+ private static class Sub extends Super {
+
+ @SuppressWarnings("unused")
+ private Simple simple = new Simple();
+
+ /**
+ * Used to verify multi-component retrieval.
+ */
+ private Container cont = new Container();
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java
new file mode 100644
index 00000000..be8d6c26
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+/**
+ * Used to test extractors.
+ */
+public class ClassExtractorsTestSupport {
+
+ private ClassExtractorsTestSupport2 nested = new ClassExtractorsTestSupport2();
+
+ /**
+ *
+ */
+ public ClassExtractorsTestSupport() {
+ super();
+ }
+
+ protected ClassExtractorsTestSupport2 getNested() {
+ return nested;
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java
new file mode 100644
index 00000000..6941d033
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+/**
+ * Used to test extractors.
+ */
+public class ClassExtractorsTestSupport2 {
+
+ public static final int NESTED_VALUE = 30;
+
+ @SuppressWarnings("unused")
+ private int theValue = NESTED_VALUE;
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java
new file mode 100644
index 00000000..d1458de7
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+
+public class ExtractorExceptionTest extends ExceptionsTester {
+
+ @Test
+ public void test() {
+ assertEquals(5, test(ExtractorException.class));
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java
new file mode 100644
index 00000000..6fc2e20e
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import java.lang.reflect.Field;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FieldExtractorTest {
+
+ private static final String VALUE = "the value";
+ private static final Integer INT_VALUE = 10;
+
+ private Field field;
+ private FieldExtractor ext;
+
+ @Before
+ public void setUp() throws Exception {
+ field = MyClass.class.getDeclaredField("value");
+ ext = new FieldExtractor(field);
+ }
+
+ @Test
+ public void testExtract() throws Exception {
+ assertEquals(VALUE, ext.extract(new MyClass()));
+
+ // repeat
+ assertEquals(VALUE, ext.extract(new MyClass()));
+
+ // null value
+ MyClass obj = new MyClass();
+ obj.value = null;
+ assertEquals(null, ext.extract(obj));
+
+ obj.value = VALUE + "X";
+ assertEquals(VALUE + "X", ext.extract(obj));
+
+ // different value type
+ field = MyClass.class.getDeclaredField("value2");
+ ext = new FieldExtractor(field);
+ assertEquals(INT_VALUE, ext.extract(new MyClass()));
+ }
+
+ @Test
+ public void testExtract_ArgEx() {
+ // pass it the wrong class type
+ assertNull(ext.extract(this));
+ }
+
+ private static class MyClass {
+ @SuppressWarnings("unused")
+ private String value = VALUE;
+
+ @SuppressWarnings("unused")
+ private int value2 = INT_VALUE;
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java
new file mode 100644
index 00000000..48985bf3
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+import static org.junit.Assert.*;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MapExtractorTest {
+ private static final String KEY = "a.key";
+ private static final String VALUE = "a.value";
+
+ private MapExtractor ext;
+
+ @Before
+ public void setUp() {
+ ext = new MapExtractor(KEY);
+ }
+
+ @Test
+ public void testExtract_NotAMap() {
+
+ // object is not a map (i.e., it's a String)
+ assertNull(ext.extract(KEY));
+ }
+
+ @Test
+ public void testExtract_MissingValue() {
+
+ Map<String,Object> map = new HashMap<>();
+ map.put(KEY+"x", VALUE+"x");
+
+ // object is a map, but doesn't have the key
+ assertNull(ext.extract(map));
+ }
+
+ @Test
+ public void testExtract() {
+
+ Map<String,Object> map = new HashMap<>();
+ map.put(KEY+"x", VALUE+"x");
+ map.put(KEY, VALUE);
+
+ // object is a map and contains the key
+ assertEquals(VALUE, ext.extract(map));
+
+ // change to value to a different type
+ map.put(KEY, 20);
+ assertEquals(20, ext.extract(map));
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java
new file mode 100644
index 00000000..ae5858e7
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java
@@ -0,0 +1,99 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import java.lang.reflect.Method;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MethodExtractorTest {
+
+ private static final String VALUE = "the value";
+ private static final Integer INT_VALUE = 10;
+
+ private Method meth;
+ private MethodExtractor ext;
+
+ @Before
+ public void setUp() throws Exception {
+ meth = MyClass.class.getMethod("getValue");
+ ext = new MethodExtractor(meth);
+ }
+
+ @Test
+ public void testExtract() throws Exception {
+ assertEquals(VALUE, ext.extract(new MyClass()));
+
+ // repeat
+ assertEquals(VALUE, ext.extract(new MyClass()));
+
+ // null value
+ MyClass obj = new MyClass();
+ meth = MyClass.class.getMethod("getNullValue");
+ ext = new MethodExtractor(meth);
+ assertEquals(null, ext.extract(obj));
+
+ // different value type
+ meth = MyClass.class.getMethod("getIntValue");
+ ext = new MethodExtractor(meth);
+ assertEquals(INT_VALUE, ext.extract(new MyClass()));
+ }
+
+ @Test
+ public void testExtract_ArgEx() {
+ // pass it the wrong class type
+ assertNull(ext.extract(this));
+ }
+
+ @Test
+ public void testExtract_InvokeEx() throws Exception {
+ // invoke method that throws an exception
+ meth = MyClass.class.getMethod("throwException");
+ ext = new MethodExtractor(meth);
+ assertEquals(null, ext.extract(new MyClass()));
+ }
+
+ private static class MyClass {
+
+ @SuppressWarnings("unused")
+ public String getValue() {
+ return VALUE;
+ }
+
+ @SuppressWarnings("unused")
+ public int getIntValue() {
+ return INT_VALUE;
+ }
+
+ @SuppressWarnings("unused")
+ public String getNullValue() {
+ return null;
+ }
+
+ @SuppressWarnings("unused")
+ public String throwException() {
+ throw new IllegalStateException("expected");
+ }
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties
new file mode 100644
index 00000000..a4b5bc76
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties
@@ -0,0 +1,33 @@
+
+pooling.controllerA.topic = topic.A
+pooling.controllerA.enabled = true
+pooling.controllerA.offline.queue.limit = 5
+pooling.controllerA.offline.queue.age.milliseconds = 100
+pooling.controllerA.start.heartbeat.milliseconds = 10
+pooling.controllerA.reactivate.milliseconds = 20
+pooling.controllerA.identification.milliseconds = 30
+pooling.controllerA.active.heartbeat.milliseconds = 40
+pooling.controllerA.inter.heartbeat.milliseconds = 50
+
+pooling.controllerB.topic = topic.B
+pooling.controllerB.enabled = true
+pooling.controllerB.offline.queue.limit = 6
+pooling.controllerB.offline.queue.age.milliseconds = 101
+pooling.controllerB.start.heartbeat.milliseconds = 11
+pooling.controllerB.reactivate.milliseconds = 21
+pooling.controllerB.identification.milliseconds = 31
+pooling.controllerB.active.heartbeat.milliseconds = 41
+pooling.controllerB.inter.heartbeat.milliseconds = 51
+
+pooling.controllerDisabled.enabled = false
+
+# this has an invalid property
+pooling.controllerException.topic = topic.B
+pooling.controllerException.enabled = true
+pooling.controllerException.offline.queue.limit = INVALID NUMBER
+pooling.controllerException.offline.queue.age.milliseconds = 101
+pooling.controllerException.start.heartbeat.milliseconds = 11
+pooling.controllerException.reactivate.milliseconds = 21
+pooling.controllerException.identification.milliseconds = 31
+pooling.controllerException.active.heartbeat.milliseconds = 41
+pooling.controllerException.inter.heartbeat.milliseconds = 51
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BasicMessageTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BasicMessageTester.java
new file mode 100644
index 00000000..69d7e67c
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BasicMessageTester.java
@@ -0,0 +1,245 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Superclass used to test subclasses of {@link Message}.
+ *
+ * @param <T> type of {@link Message} subclass that this tests
+ */
+public abstract class BasicMessageTester<T extends Message> {
+ // values set by makeValidMessage()
+ public static final String VALID_HOST_PREDECESSOR = "hostA";
+ public static final String VALID_HOST = "hostB";
+ public static final String VALID_CHANNEL = "channelC";
+
+ /**
+ * Used to perform JSON serialization and de-serialization.
+ */
+ public final ObjectMapper mapper = new ObjectMapper();
+
+ /**
+ * The subclass of the type of Message being tested.
+ */
+ private final Class<T> subclazz;
+
+ /**
+ *
+ * @param subclazz subclass of {@link Message} being tested
+ */
+ public BasicMessageTester(Class<T> subclazz) {
+ this.subclazz = subclazz;
+ }
+
+ /**
+ * Creates a default Message and verifies that the source and channel are
+ * {@code null}.
+ *
+ * @return the default Message
+ */
+ @Test
+ public final void testDefaultConstructor() {
+ testDefaultConstructorFields(makeDefaultMessage());
+ }
+
+ /**
+ * Tests that the Message has the correct source, and that the channel is
+ * {@code null}.
+ *
+ * @param msg message to be checked
+ * @param expectedSource what the source is expected to be
+ */
+ @Test
+ public final void testConstructorWithArgs() {
+ testValidFields(makeValidMessage());
+ }
+
+ /**
+ * Makes a valid message and then verifies that it can be serialized and
+ * de-serialized. Verifies that the de-serialized message is of the same
+ * type, and has the same content, as the original.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public final void testJsonEncodeDecode() throws Exception {
+ T originalMsg = makeValidMessage();
+
+ Message msg = mapper.readValue(mapper.writeValueAsString(originalMsg), Message.class);
+ assertEquals(subclazz, msg.getClass());
+
+ msg.checkValidity();
+
+ testValidFields(subclazz.cast(msg));
+ }
+
+ /**
+ * Creates a valid Message and verifies that checkValidity() passes.
+ *
+ * @throws PoolingFeatureException if an error occurs
+ */
+ @Test
+ public final void testCheckValidity_Ok() throws PoolingFeatureException {
+ T msg = makeValidMessage();
+ msg.checkValidity();
+
+ testValidFields(subclazz.cast(msg));
+ }
+
+ /**
+ * Creates a default Message and verifies that checkValidity() fails. Does
+ * not throw an exception.
+ */
+ @Test
+ public final void testCheckValidity_DefaultConstructor() {
+ try {
+ makeDefaultMessage().checkValidity();
+ fail("missing exception");
+
+ } catch (PoolingFeatureException expected) {
+ // success
+ }
+ }
+
+ /**
+ * Creates a message via {@link #makeValidMessage()}, updates it via the
+ * given function, and then invokes the checkValidity() method on it. It is
+ * expected that the checkValidity() will throw an exception.
+ *
+ * @param func function to update the message prior to invoking
+ * checkValidity()
+ */
+ public void expectCheckValidityFailure(MessageUpdateFunction<T> func) {
+ try {
+ T msg = makeValidMessage();
+ func.update(msg);
+
+ msg.checkValidity();
+
+ fail("missing exception");
+
+ } catch (PoolingFeatureException expected) {
+ // success
+ }
+ }
+
+ /**
+ * Creates a message via {@link #makeValidMessage()}, updates one of its
+ * fields via the given function, and then invokes the checkValidity()
+ * method on it. It is expected that the checkValidity() will throw an
+ * exception. It checks both the case when the message's field is set to
+ * {@code null}, and when it is set to empty (i.e., "").
+ *
+ * @param func function to update the message's field prior to invoking
+ * checkValidity()
+ */
+ public void expectCheckValidityFailure_NullOrEmpty(MessageFieldUpdateFunction<T> func) {
+ expectCheckValidityFailure(msg -> func.update(msg, null));
+ expectCheckValidityFailure(msg -> func.update(msg, ""));
+ }
+
+ /**
+ * Makes a message using the default constructor.
+ *
+ * @return a new Message
+ */
+ public final T makeDefaultMessage() {
+ try {
+ return subclazz.getConstructor().newInstance();
+
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ }
+
+
+ // the remaining methods will typically be overridden
+
+ /**
+ * Makes a message that will pass the validity check. Note: this should use
+ * the non-default constructor, and the source and channel should be set to
+ * {@link VALID_HOST} and {@link VALID_CHANNEL}, respectively.
+ *
+ * @return a valid Message
+ */
+ public abstract T makeValidMessage();
+
+ /**
+ * Verifies that fields are set as expected by
+ * {@link #makeDefaultMessage()}.
+ *
+ * @param msg the default Message
+ */
+ public void testDefaultConstructorFields(T msg) {
+ assertNull(msg.getSource());
+ assertNull(msg.getChannel());
+ }
+
+ /**
+ * Verifies that fields are set as expected by {@link #makeValidMessage()}.
+ *
+ * @param msg message whose fields are to be validated
+ */
+ public void testValidFields(T msg) {
+ assertEquals(VALID_HOST, msg.getSource());
+ assertEquals(VALID_CHANNEL, msg.getChannel());
+ }
+
+ /**
+ * Function that updates a message.
+ *
+ * @param <T> type of Message the function updates
+ */
+ @FunctionalInterface
+ public static interface MessageUpdateFunction<T extends Message> {
+
+ /**
+ * Updates a message.
+ *
+ * @param msg message to be updated
+ */
+ public void update(T msg);
+ }
+
+ /**
+ * Function that updates a single field within a message.
+ *
+ * @param <T> type of Message the function updates
+ */
+ @FunctionalInterface
+ public static interface MessageFieldUpdateFunction<T extends Message> {
+
+ /**
+ * Updates a field within a message.
+ *
+ * @param msg message to be updated
+ * @param newValue new field value
+ */
+ public void update(T msg, String newValue);
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java
new file mode 100644
index 00000000..ef03d4d6
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java
@@ -0,0 +1,351 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+
+public class BucketAssignmentsTest {
+
+ @Test
+ public void testBucketAssignments() {
+ new BucketAssignments();
+ }
+
+ @Test
+ public void testBucketAssignmentsStringArray() {
+ String arr[] = {"abc", "def"};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertNotNull(asgn.getHostArray());
+ assertEquals(arr.toString(), asgn.getHostArray().toString());
+ }
+
+ @Test
+ public void testGetHostArray_testSetHostArray() {
+
+ String arr[] = {"abc", "def"};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertNotNull(asgn.getHostArray());
+ assertEquals(arr.toString(), asgn.getHostArray().toString());
+
+ String arr2[] = {"xyz"};
+ asgn.setHostArray(arr2);
+
+ assertNotNull(asgn.getHostArray());
+ assertEquals(arr2.toString(), asgn.getHostArray().toString());
+ }
+
+ @Test
+ public void testGetLeader() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertNull(asgn.getLeader());
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertNull(asgn.getLeader());
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertNull(asgn.getLeader());
+
+ // some entries are null
+ asgn.setHostArray(new String[] {null, "abc", null});
+ assertEquals("abc", asgn.getLeader());
+
+ // only one entry
+ asgn.setHostArray(new String[] {"abc"});
+ assertEquals("abc", asgn.getLeader());
+
+ // first is least
+ asgn.setHostArray(new String[] {"Ahost", "Bhost", "Chost"});
+ assertEquals("Ahost", asgn.getLeader());
+
+ // middle is least
+ asgn.setHostArray(new String[] {"Xhost", "Bhost", "Chost"});
+ assertEquals("Bhost", asgn.getLeader());
+
+ // last is least
+ asgn.setHostArray(new String[] {"Xhost", "Yhost", "Chost"});
+ assertEquals("Chost", asgn.getLeader());
+
+ // multiple entries
+ asgn.setHostArray(new String[] {"Xhost", "Bhost", "Chost", "Bhost", "Xhost", "Chost"});
+ assertEquals("Bhost", asgn.getLeader());
+ }
+
+ @Test
+ public void testHasAssignment() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertFalse(asgn.hasAssignment("abc"));
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertFalse(asgn.hasAssignment("abc"));
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertFalse(asgn.hasAssignment("abc"));
+
+ // some entries are null
+ asgn.setHostArray(new String[] {null, "abc", null});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // only one entry
+ asgn.setHostArray(new String[] {"abc"});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // appears as first entry
+ asgn.setHostArray(new String[] {"abc", "Bhost", "Chost"});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // appears in middle
+ asgn.setHostArray(new String[] {"Xhost", "abc", "Chost"});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // appears last
+ asgn.setHostArray(new String[] {"Xhost", "Yhost", "abc"});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // appears repeatedly
+ asgn.setHostArray(new String[] {"Xhost", "Bhost", "Chost", "Bhost", "Xhost", "Chost"});
+ assertTrue(asgn.hasAssignment("Bhost"));
+ }
+
+ @Test
+ public void testGetAllHosts() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertEquals("[]", getSortedHosts(asgn).toString());
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertEquals("[]", getSortedHosts(asgn).toString());
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertEquals("[]", getSortedHosts(asgn).toString());
+
+ // some entries are null
+ asgn.setHostArray(new String[] {null, "abc", null});
+ assertEquals("[abc]", getSortedHosts(asgn).toString());
+
+ // only one entry
+ asgn.setHostArray(new String[] {"abc"});
+ assertEquals("[abc]", getSortedHosts(asgn).toString());
+
+ // multiple, repeated entries
+ asgn.setHostArray(new String[] {"def", "abc", "def", "ghi", "def", "def", "xyz"});
+ assertEquals("[abc, def, ghi, xyz]", getSortedHosts(asgn).toString());
+ }
+
+ /**
+ * Gets the hosts, sorted, so that the order is predictable.
+ *
+ * @param asgn assignment whose hosts are to be retrieved
+ * @return a new, sorted set of hosts
+ */
+ private SortedSet<String> getSortedHosts(BucketAssignments asgn) {
+ return new TreeSet<>(asgn.getAllHosts());
+ }
+
+ @Test
+ public void testGetAssignedHost() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertNull(asgn.getAssignedHost(3));
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertNull(asgn.getAssignedHost(3));
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertNull(asgn.getAssignedHost(3));
+
+ // multiple, repeated entries
+ String[] arr = {"def", "abc", "def", "ghi", "def", "def", "xyz"};
+ asgn.setHostArray(arr);
+
+ for (int x = 0; x < arr.length; ++x) {
+ assertEquals("x=" + x, arr[x], asgn.getAssignedHost(x));
+ }
+
+ // negative
+ assertNull(asgn.getAssignedHost(-1));
+
+ // beyond end
+ assertNull(asgn.getAssignedHost(arr.length));
+ assertNull(asgn.getAssignedHost(arr.length + 1));
+ }
+
+ @Test
+ public void testSize() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertEquals(0, asgn.size());
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertEquals(0, asgn.size());
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertEquals(5, asgn.size());
+
+ // multiple, repeated entries
+ String[] arr = {"def", "abc", "def", "ghi", "def", "def", "xyz"};
+ asgn.setHostArray(arr);
+ assertEquals(arr.length, asgn.size());
+ }
+
+ @Test
+ public void testCheckValidity() throws Exception {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ expectException(asgn);
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ expectException(asgn);
+
+ // array is too big
+ asgn.setHostArray(new String[BucketAssignments.MAX_BUCKETS + 1]);
+ expectException(asgn);
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ expectException(asgn);
+
+ // null at the beginning
+ asgn.setHostArray(new String[] {null, "Bhost", "Chost"});
+ expectException(asgn);
+
+ // null in the middle
+ asgn.setHostArray(new String[] {"Ahost", null, "Chost"});
+ expectException(asgn);
+
+ // null at the end
+ asgn.setHostArray(new String[] {"Ahost", "Bhost", null});
+ expectException(asgn);
+
+ // only one entry
+ asgn.setHostArray(new String[] {"abc"});
+ asgn.checkValidity();
+
+ // multiple entries
+ asgn.setHostArray(new String[] {"Ahost", "Bhost", "Chost"});
+ asgn.checkValidity();
+ }
+
+ @Test
+ public void testHashCode() {
+ // with null assignments
+ BucketAssignments asgn = new BucketAssignments();
+ asgn.hashCode();
+
+ // with empty array
+ asgn = new BucketAssignments(new String[0]);
+ asgn.hashCode();
+
+ // with null items
+ asgn = new BucketAssignments(new String[] {"abc", null, "def"});
+ asgn.hashCode();
+
+ // same assignments
+ asgn = new BucketAssignments(new String[] {"abc", null, "def"});
+ int code = asgn.hashCode();
+
+ asgn = new BucketAssignments(new String[] {"abc", null, "def"});
+ assertEquals(code, asgn.hashCode());
+
+ // slightly different values (i.e., changed "def" to "eef")
+ asgn = new BucketAssignments(new String[] {"abc", null, "eef"});
+ assertTrue(code != asgn.hashCode());
+ }
+
+ @Test
+ public void testEquals() {
+ // null object
+ BucketAssignments asgn = new BucketAssignments();
+ assertFalse(asgn.equals(null));
+
+ // same object
+ asgn = new BucketAssignments();
+ assertTrue(asgn.equals(asgn));
+
+ // different class of object
+ asgn = new BucketAssignments();
+ assertFalse(asgn.equals("not an assignment object"));
+
+ // with null assignments
+ asgn = new BucketAssignments();
+ assertTrue(asgn.equals(new BucketAssignments()));
+
+ assertFalse(asgn.equals(new BucketAssignments(new String[] {"abc"})));
+
+ // with empty array
+ asgn = new BucketAssignments(new String[0]);
+ assertTrue(asgn.equals(asgn));
+
+ assertFalse(asgn.equals(new BucketAssignments()));
+ assertFalse(asgn.equals(new BucketAssignments(new String[] {"abc"})));
+
+ // with null items
+ String[] arr = {"abc", null, "def"};
+ asgn = new BucketAssignments(arr);
+ assertTrue(asgn.equals(asgn));
+ assertTrue(asgn.equals(new BucketAssignments(arr)));
+ assertTrue(asgn.equals(new BucketAssignments(new String[] {"abc", null, "def"})));
+
+ assertFalse(asgn.equals(new BucketAssignments()));
+ assertFalse(asgn.equals(new BucketAssignments(new String[] {"abc", null, "XYZ"})));
+
+ assertFalse(asgn.equals(new BucketAssignments()));
+ }
+
+ /**
+ * Expects an exception when checkValidity() is called.
+ *
+ * @param asgn assignments to be checked
+ */
+ private void expectException(BucketAssignments asgn) {
+ try {
+ asgn.checkValidity();
+ fail("missing exception");
+
+ } catch (PoolingFeatureException expected) {
+ // success
+ }
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
new file mode 100644
index 00000000..c56caca8
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
@@ -0,0 +1,217 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+
+public class ForwardTest extends BasicMessageTester<Forward> {
+ // values set by makeValidMessage()
+ public static final CommInfrastructure VALID_PROTOCOL = CommInfrastructure.UEB;
+ public static final int VALID_HOPS = 0;
+ public static final String VALID_TOPIC = "topicA";
+ public static final String VALID_PAYLOAD = "payloadA";
+ public static final String VALID_REQUEST_ID = "requestIdA";
+
+ /**
+ * Time, in milliseconds, after which the most recent message was created.
+ */
+ private static long tcreateMs;
+
+ public ForwardTest() {
+ super(Forward.class);
+ }
+
+ @Test
+ public void testBumpNumHops() {
+ Forward msg = makeValidMessage();
+
+ for (int x = 0; x < 3; ++x) {
+ assertEquals("x=" + x, x, msg.getNumHops());
+ msg.bumpNumHops();
+ }
+ }
+
+ @Test
+ public void testGetNumHops_testSetNumHops() {
+ Forward msg = makeValidMessage();
+
+ // from constructor
+ assertEquals(VALID_HOPS, msg.getNumHops());
+
+ msg.setNumHops(5);
+ assertEquals(5, msg.getNumHops());
+
+ msg.setNumHops(7);
+ assertEquals(7, msg.getNumHops());
+ }
+
+ @Test
+ public void testGetCreateTimeMs_testSetCreateTimeMs() {
+ Forward msg = makeValidMessage();
+
+ // from constructor
+ assertTrue(msg.getCreateTimeMs() >= tcreateMs);
+
+ msg.setCreateTimeMs(1000L);
+ assertEquals(1000L, msg.getCreateTimeMs());
+
+ msg.setCreateTimeMs(2000L);
+ assertEquals(2000L, msg.getCreateTimeMs());
+ }
+
+ @Test
+ public void testGetProtocol_testSetProtocol() {
+ Forward msg = makeValidMessage();
+
+ // from constructor
+ assertEquals(CommInfrastructure.UEB, msg.getProtocol());
+
+ msg.setProtocol(CommInfrastructure.DMAAP);
+ assertEquals(CommInfrastructure.DMAAP, msg.getProtocol());
+
+ msg.setProtocol(CommInfrastructure.UEB);
+ assertEquals(CommInfrastructure.UEB, msg.getProtocol());
+ }
+
+ @Test
+ public void testGetTopic_testSetTopic() {
+ Forward msg = makeValidMessage();
+
+ // from constructor
+ assertEquals(VALID_TOPIC, msg.getTopic());
+
+ msg.setTopic("topicX");
+ assertEquals("topicX", msg.getTopic());
+
+ msg.setTopic("topicY");
+ assertEquals("topicY", msg.getTopic());
+ }
+
+ @Test
+ public void testGetPayload_testSetPayload() {
+ Forward msg = makeValidMessage();
+
+ // from constructor
+ assertEquals(VALID_PAYLOAD, msg.getPayload());
+
+ msg.setPayload("payloadX");
+ assertEquals("payloadX", msg.getPayload());
+
+ msg.setPayload("payloadY");
+ assertEquals("payloadY", msg.getPayload());
+ }
+
+ @Test
+ public void testGetRequestId_testSetRequestId() {
+ Forward msg = makeValidMessage();
+
+ // from constructor
+ assertEquals(VALID_REQUEST_ID, msg.getRequestId());
+
+ msg.setRequestId("reqX");
+ assertEquals("reqX", msg.getRequestId());
+
+ msg.setRequestId("reqY");
+ assertEquals("reqY", msg.getRequestId());
+ }
+
+ @Test
+ public void testIsExpired() {
+ Forward msg = makeValidMessage();
+
+ long tcreate = msg.getCreateTimeMs();
+ assertTrue(msg.isExpired(tcreate + 1));
+ assertTrue(msg.isExpired(tcreate + 10));
+
+ assertFalse(msg.isExpired(tcreate));
+ assertFalse(msg.isExpired(tcreate - 1));
+ assertFalse(msg.isExpired(tcreate - 10));
+ }
+
+ @Test
+ public void testCheckValidity_InvalidFields() throws Exception {
+ // null source (i.e., superclass field)
+ expectCheckValidityFailure(msg -> msg.setSource(null));
+
+ // null protocol
+ expectCheckValidityFailure(msg -> msg.setProtocol(null));
+
+ // null or empty topic
+ expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setTopic(value));
+
+ // null payload
+ expectCheckValidityFailure(msg -> msg.setPayload(null));
+
+ // empty payload should NOT throw an exception
+ Forward forward = makeValidMessage();
+ forward.setPayload("");
+ forward.checkValidity();
+
+ // null or empty requestId
+ expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setRequestId(value));
+
+ // invalid hop count
+ expectCheckValidityFailure(msg -> msg.setNumHops(-1));
+ }
+
+ /**
+ * Makes a message that will pass the validity check.
+ *
+ * @return a valid Message
+ */
+ public Forward makeValidMessage() {
+ tcreateMs = System.currentTimeMillis();
+
+ Forward msg = new Forward(VALID_HOST, VALID_PROTOCOL, VALID_TOPIC, VALID_PAYLOAD, VALID_REQUEST_ID);
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+ @Override
+ public void testDefaultConstructorFields(Forward msg) {
+ super.testDefaultConstructorFields(msg);
+
+ assertEquals(VALID_HOPS, msg.getNumHops());
+ assertEquals(0, msg.getCreateTimeMs());
+ assertNull(msg.getPayload());
+ assertNull(msg.getProtocol());
+ assertNull(msg.getRequestId());
+ assertNull(msg.getTopic());
+ }
+
+ @Override
+ public void testValidFields(Forward msg) {
+ super.testValidFields(msg);
+
+ assertEquals(VALID_HOPS, msg.getNumHops());
+ assertTrue(msg.getCreateTimeMs() >= tcreateMs);
+ assertEquals(VALID_PAYLOAD, msg.getPayload());
+ assertEquals(VALID_PROTOCOL, msg.getProtocol());
+ assertEquals(VALID_REQUEST_ID, msg.getRequestId());
+ assertEquals(VALID_TOPIC, msg.getTopic());
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
new file mode 100644
index 00000000..da78dbe3
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.Assert.assertEquals;
+
+public class HeartbeatTest extends BasicMessageTester<Heartbeat> {
+
+ /**
+ * Sequence number to validate time stamps within the heart beat.
+ */
+ private long sequence = 0;
+
+ public HeartbeatTest() {
+ super(Heartbeat.class);
+ }
+
+ /**
+ * Makes a message that will pass the validity check.
+ *
+ * @return a valid Message
+ */
+ public Heartbeat makeValidMessage() {
+ Heartbeat msg = new Heartbeat(VALID_HOST, ++sequence);
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+ @Override
+ public void testDefaultConstructorFields(Heartbeat msg) {
+ super.testDefaultConstructorFields(msg);
+
+ assertEquals(sequence, msg.getTimestampMs());
+ }
+
+ @Override
+ public void testValidFields(Heartbeat msg) {
+ super.testValidFields(msg);
+
+ assertEquals(sequence, msg.getTimestampMs());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
new file mode 100644
index 00000000..8255034f
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class IdentificationTest extends MessageWithAssignmentsTester<Identification> {
+
+ public IdentificationTest() {
+ super(Identification.class);
+ }
+
+ @Before
+ public void setUp() {
+ setNullAssignments(false);
+ }
+
+ /**
+ * The superclass will already invoke testJsonEncodeDecode() to verify that
+ * things work with a fully populated message. This verifies that it also
+ * works if the assignments are null.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public final void testJsonEncodeDecode_WithNullAssignments() throws Exception {
+ setNullAssignments(true);
+ testJsonEncodeDecode();
+ }
+
+ /**
+ * The superclass will already invoke testCheckValidity() to
+ * verify that things work with a fully populated message. This verifies
+ * that it also works if the assignments are null.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testCheckValidity_NullAssignments() throws Exception {
+ // null assignments are OK
+ Identification msg = makeValidMessage();
+ msg.setAssignments(null);
+ msg.checkValidity();
+ }
+
+ /**
+ * Makes a message that will pass the validity check.
+ *
+ * @return a valid Message
+ */
+ public Identification makeValidMessage() {
+ Identification msg = new Identification(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN));
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
new file mode 100644
index 00000000..0f58e224
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class LeaderTest extends MessageWithAssignmentsTester<Leader> {
+
+ public LeaderTest() {
+ super(Leader.class);
+ }
+
+ @Before
+ public void setUp() {
+ setNullAssignments(false);
+ }
+
+ /**
+ * The superclass will already invoke testCheckValidity_InvalidFields() to
+ * verify that things work with a fully populated message. This verifies
+ * that it also works if the assignments are null.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testCheckValidity_InvalidFields_NullAssignments() throws Exception {
+ // null assignments are invalid
+ expectCheckValidityFailure(msg -> msg.setAssignments(null));
+ }
+
+ @Test
+ public void testCheckValidity_SourceIsNotLeader() throws Exception {
+ Leader ldr = makeValidMessage();
+
+ ldr.setSource("xyz");
+
+ // the source does not have an assignment
+ BucketAssignments asgnUnassigned = new BucketAssignments(new String[] {"abc", "def"});
+ expectCheckValidityFailure(msg -> msg.setAssignments(asgnUnassigned));
+
+ // the source is not the smallest UUID in this assignment
+ BucketAssignments asgnNotSmallest = new BucketAssignments(new String[] {VALID_HOST_PREDECESSOR, VALID_HOST});
+ expectCheckValidityFailure(msg -> msg.setAssignments(asgnNotSmallest));
+ }
+
+ /**
+ * Makes a message that will pass the validity check.
+ *
+ * @return a valid Message
+ */
+ public Leader makeValidMessage() {
+ Leader msg = new Leader(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN));
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/MessageTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/MessageTest.java
new file mode 100644
index 00000000..432dcc3c
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/MessageTest.java
@@ -0,0 +1,80 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import org.junit.Test;
+
+public class MessageTest extends BasicMessageTester<Message> {
+
+ public MessageTest() {
+ super(Message.class);
+ }
+
+ @Test
+ public void testGetSource_testSetSource() {
+ Message msg = new Message();
+
+ msg.setSource("hello");
+ assertEquals("hello", msg.getSource());
+ assertNull(msg.getChannel());
+
+ msg.setSource("world");
+ assertEquals("world", msg.getSource());
+ assertNull(msg.getChannel());
+ }
+
+ @Test
+ public void testGetChannel_testSetChannel() {
+ Message msg = new Message();
+
+ msg.setChannel("hello");
+ assertEquals("hello", msg.getChannel());
+ assertNull(msg.getSource());
+
+ msg.setChannel("world");
+ assertEquals("world", msg.getChannel());
+ assertNull(msg.getSource());
+ }
+
+ @Test
+ public void testCheckValidity_InvalidFields() {
+ // null or empty source
+ expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setSource(value));
+
+ // null or empty channel
+ expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setChannel(value));
+ }
+
+ /**
+ * Makes a message that will pass the validity check.
+ *
+ * @return a valid Message
+ */
+ public Message makeValidMessage() {
+ Message msg = new Message(VALID_HOST);
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/MessageWithAssignmentsTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/MessageWithAssignmentsTester.java
new file mode 100644
index 00000000..2b670dcc
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/MessageWithAssignmentsTester.java
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import org.junit.Test;
+
+/**
+ * Superclass used to test subclasses of {@link MessageWithAssignments}.
+ *
+ * @param <T> type of {@link MessageWithAssignments} subclass that this tests
+ */
+public abstract class MessageWithAssignmentsTester<T extends MessageWithAssignments> extends BasicMessageTester<T> {
+ // values set by makeValidMessage()
+ public static final String[] VALID_ARRAY = {VALID_HOST, VALID_HOST+"_xxx"};
+ public static final BucketAssignments VALID_ASGN = new BucketAssignments(VALID_ARRAY);
+
+ /**
+ * {@code True} if {@code null} assignments are allowed, {@code false}
+ * otherwise.
+ */
+ private boolean nullAssignments;
+
+ /**
+ *
+ * @param subclazz subclass of {@link MessageWithAssignments} being tested
+ */
+ public MessageWithAssignmentsTester(Class<T> subclazz) {
+ super(subclazz);
+ }
+
+ /**
+ * Indicates whether or not {@code null} assignments should be used for the
+ * remaining tests.
+ *
+ * @param nullAssignments {@code true} to use {@code null} assignments,
+ * {@code false} otherwise
+ */
+ public void setNullAssignments(boolean nullAssignments) {
+ this.nullAssignments = nullAssignments;
+ }
+
+ public boolean isNullAssignments() {
+ return nullAssignments;
+ }
+
+ @Test
+ public void testCheckValidity_InvalidFields() throws Exception {
+ // null source (i.e., superclass field)
+ expectCheckValidityFailure(msg -> msg.setSource(null));
+
+ // empty assignments
+ expectCheckValidityFailure(msg -> msg.setAssignments(new BucketAssignments(new String[0])));
+
+ // invalid assignment
+ String[] invalidAssignment = {"abc", null};
+ expectCheckValidityFailure(msg -> msg.setAssignments(new BucketAssignments(invalidAssignment)));
+ }
+
+ @Test
+ public void testGetAssignments_testSetAssignments() {
+ MessageWithAssignments msg = makeValidMessage();
+
+ // from constructor
+ assertEquals(VALID_ASGN, msg.getAssignments());
+
+ BucketAssignments asgn = new BucketAssignments();
+ msg.setAssignments(asgn);
+ assertEquals(asgn, msg.getAssignments());
+ }
+
+ @Override
+ public void testDefaultConstructorFields(T msg) {
+ super.testDefaultConstructorFields(msg);
+
+ assertNull(msg.getAssignments());
+ }
+
+ @Override
+ public void testValidFields(T msg) {
+ super.testValidFields(msg);
+
+ if (nullAssignments) {
+ assertNull(msg.getAssignments());
+
+ } else {
+ assertEquals(VALID_ASGN, msg.getAssignments());
+ }
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
new file mode 100644
index 00000000..8d0f4a6f
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+public class OfflineTest extends BasicMessageTester<Offline> {
+
+ public OfflineTest() {
+ super(Offline.class);
+ }
+
+ /**
+ * Makes a message that will pass the validity check.
+ *
+ * @return a valid Message
+ */
+ public Offline makeValidMessage() {
+ Offline msg = new Offline(VALID_HOST);
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
new file mode 100644
index 00000000..0b2a986d
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+public class QueryTest extends BasicMessageTester<Query> {
+
+ public QueryTest() {
+ super(Query.class);
+ }
+
+ /**
+ * Makes a message that will pass the validity check.
+ *
+ * @return a valid Message
+ */
+ public Query makeValidMessage() {
+ Query msg = new Query(VALID_HOST);
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java
new file mode 100644
index 00000000..428b5853
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import org.junit.Test;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class Trial {
+
+ @Test
+ public void test() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+
+ Message msg = new Forward("me", CommInfrastructure.DMAAP, "my topic", "a message", "my req");
+
+ String enc = mapper.writeValueAsString(msg);
+ System.out.println("enc=" + enc);
+
+ Message msg2 = mapper.readValue(enc, Message.class);
+ System.out.println("class=" + msg2.getClass());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
new file mode 100644
index 00000000..7997a4ee
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -0,0 +1,441 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class ActiveStateTest extends BasicStateTester {
+
+ private ActiveState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new ActiveState(mgr);
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+
+ // ensure the timers were created
+ verify(mgr, atLeast(1)).scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class));
+
+ // ensure a heart beat was generated
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+ assertEquals(MY_HOST, msg.second.getSource());
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testProcessHeartbeat_NullHost() {
+ assertNull(state.process(new Heartbeat()));
+
+ assertFalse(state.isMyHeartbeatSeen());
+ assertFalse(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ public void testProcessHeartbeat_MyHost() {
+ assertNull(state.process(new Heartbeat(MY_HOST, 0L)));
+
+ assertTrue(state.isMyHeartbeatSeen());
+ assertFalse(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ public void testProcessHeartbeat_Predecessor() {
+ assertNull(state.process(new Heartbeat(HOST2, 0L)));
+
+ assertFalse(state.isMyHeartbeatSeen());
+ assertTrue(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ public void testProcessHeartbeat_OtherHost() {
+ assertNull(state.process(new Heartbeat(HOST3, 0L)));
+
+ assertFalse(state.isMyHeartbeatSeen());
+ assertFalse(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ public void testProcessOffline_NullHost() {
+ // should be ignored
+ assertNull(state.process(new Offline()));
+ }
+
+ @Test
+ public void testProcessOffline_UnassignedHost() {
+ // HOST4 is not in the assignment list - should be ignored
+ assertNull(state.process(new Offline(HOST4)));
+ }
+
+ @Test
+ public void testProcessOffline_IAmLeader() {
+ // configure the next state
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // one of the assigned hosts went offline
+ assertEquals(next, state.process(new Offline(HOST1)));
+
+ // should have sent a new Leader message
+ Leader msg = captureAdminMessage(Leader.class);
+
+ assertEquals(MY_HOST, msg.getSource());
+
+ // check new bucket assignments
+ assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST2), Arrays.asList(msg.getAssignments().getHostArray()));
+ }
+
+ @Test
+ public void testProcessOffline_PredecessorIsLeaderNowOffline() {
+ // configure the next state
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // I am not the leader, but my predecessor was
+ mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
+ state = new ActiveState(mgr);
+
+ // my predecessor went offline
+ assertEquals(next, state.process(new Offline(PREV_HOST)));
+
+ // should have sent a new Leader message
+ Leader msg = captureAdminMessage(Leader.class);
+
+ assertEquals(MY_HOST, msg.getSource());
+
+ // check new bucket assignments
+ assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST1), Arrays.asList(msg.getAssignments().getHostArray()));
+ }
+
+ @Test
+ public void testProcessOffline__PredecessorIsNotLeaderNowOffline() {
+ // I am not the leader, and neither is my predecessor
+ mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, PREV_HOST2}));
+ state = new ActiveState(mgr);
+
+ /*
+ *
+ * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader
+ * thus should be ignored.
+ */
+ assertNull(state.process(new Offline(PREV_HOST2)));
+ }
+
+ @Test
+ public void testProcessOffline_OtherAssignedHostOffline() {
+ // I am not the leader
+ mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
+ state = new ActiveState(mgr);
+
+ /*
+ * HOST1 has buckets, but it isn't the leader and it isn't my
+ * predecessor, thus should be ignored.
+ */
+ assertNull(state.process(new Offline(HOST1)));
+ }
+
+ @Test
+ public void testProcessQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(new Query()));
+
+ Identification ident = captureAdminMessage(Identification.class);
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
+ public void testActiveState() {
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+
+ // verify that it determined its neighbors
+ assertEquals(HOST1, state.getSuccHost());
+ assertEquals(HOST2, state.getPredHost());
+ }
+
+ @Test
+ public void testDetmNeighbors() {
+ // if only one host (i.e., itself)
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
+ state = new ActiveState(mgr);
+ assertEquals(null, state.getSuccHost());
+ assertEquals("", state.getPredHost());
+
+ // two hosts
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, HOST2}));
+ state = new ActiveState(mgr);
+ assertEquals(HOST2, state.getSuccHost());
+ assertEquals(HOST2, state.getPredHost());
+
+ // three hosts
+ mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2}));
+ state = new ActiveState(mgr);
+ assertEquals(HOST2, state.getSuccHost());
+ assertEquals(HOST3, state.getPredHost());
+
+ // more hosts
+ mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2, HOST4}));
+ state = new ActiveState(mgr);
+ assertEquals(HOST2, state.getSuccHost());
+ assertEquals(HOST4, state.getPredHost());
+ }
+
+ @Test
+ public void testAddTimers_WithPredecessor() {
+ // invoke start() to add the timers
+ state.start();
+
+ assertEquals(3, repeatedFutures.size());
+
+ Triple<Long, Long, StateTimerTask> timer;
+
+ // heart beat generator
+ timer = repeatedTasks.remove();
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue());
+
+ // my heart beat checker
+ timer = repeatedTasks.remove();
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue());
+
+ // predecessor's heart beat checker
+ timer = repeatedTasks.remove();
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue());
+ }
+
+ @Test
+ public void testAddTimers_SansPredecessor() {
+ // only one host, thus no predecessor
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
+ state = new ActiveState(mgr);
+
+ // invoke start() to add the timers
+ state.start();
+
+ assertEquals(2, repeatedFutures.size());
+
+ Triple<Long, Long, StateTimerTask> timer;
+
+ // heart beat generator
+ timer = repeatedTasks.remove();
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue());
+
+ // my heart beat checker
+ timer = repeatedTasks.remove();
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue());
+ }
+
+ @Test
+ public void testAddTimers_HeartbeatGenerator() {
+ // only one host so we only have to look at one heart beat at a time
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
+ state = new ActiveState(mgr);
+
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.remove();
+
+ verify(mgr).publish(anyString(), any(Heartbeat.class));
+
+ // fire the task
+ assertNull(task.third.fire(null));
+
+ // should have generated a second pair of heart beats
+ verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class));
+
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+ assertEquals(MY_HOST, msg.first);
+ assertEquals(MY_HOST, msg.second.getSource());
+ }
+
+ @Test
+ public void testAddTimers_MyHeartbeatSeen() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
+
+ // indicate that this host is still alive
+ state.process(new Heartbeat(MY_HOST, 0L));
+
+ // set up next state
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // fire the task - should not transition
+ assertNull(task.third.fire(null));
+
+ verify(mgr, never()).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ public void testAddTimers_MyHeartbeatMissed() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
+
+ // set up next state
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // fire the task - should transition
+ assertEquals(next, task.third.fire(null));
+
+ // should indicate failure
+ verify(mgr).internalTopicFailed();
+
+ // should publish an offline message
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ public void testAddTimers_PredecessorHeartbeatSeen() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
+
+ // indicate that the predecessor is still alive
+ state.process(new Heartbeat(HOST2, 0L));
+
+ // set up next state, just in case
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ // fire the task - should NOT transition
+ assertNull(task.third.fire(null));
+
+ verify(mgr, never()).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ public void testAddTimers_PredecessorHeartbeatMissed() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
+
+ // set up next state
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ // fire the task - should transition
+ assertEquals(next, task.third.fire(null));
+
+ verify(mgr).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ public void testGenHeartbeat_OneHost() {
+ // only one host (i.e., itself)
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
+ state = new ActiveState(mgr);
+
+ state.start();
+
+ verify(mgr, times(1)).publish(any(), any());
+
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+ assertEquals(MY_HOST, msg.first);
+ assertEquals(MY_HOST, msg.second.getSource());
+ }
+
+ @Test
+ public void testGenHeartbeat_MultipleHosts() {
+ state.start();
+
+ verify(mgr, times(2)).publish(any(), any());
+
+ Pair<String, Heartbeat> msg;
+ int index = 0;
+
+ // this message should go to itself
+ msg = capturePublishedMessage(Heartbeat.class, index++);
+ assertEquals(MY_HOST, msg.first);
+ assertEquals(MY_HOST, msg.second.getSource());
+
+ // this message should go to its successor
+ msg = capturePublishedMessage(Heartbeat.class, index++);
+ assertEquals(HOST1, msg.first);
+ assertEquals(MY_HOST, msg.second.getSource());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java
new file mode 100644
index 00000000..e48742f7
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java
@@ -0,0 +1,318 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.PoolingProperties;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+
+/**
+ * Superclass used to test subclasses of {@link Message}.
+ */
+public class BasicStateTester {
+
+ protected static final long STD_HEARTBEAT_WAIT_MS = 10;
+ protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
+ protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
+ protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
+ protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
+
+ protected static final String MY_TOPIC = "myTopic";
+
+ protected static final String PREV_HOST = "prevHost";
+ protected static final String PREV_HOST2 = PREV_HOST + "A";
+
+ // this follows PREV_HOST, alphabetically
+ protected static final String MY_HOST = PREV_HOST + "X";
+
+ // these follow MY_HOST, alphabetically
+ protected static final String HOST1 = MY_HOST + "1";
+ protected static final String HOST2 = MY_HOST + "2";
+ protected static final String HOST3 = MY_HOST + "3";
+ protected static final String HOST4 = MY_HOST + "4";
+
+ protected static final String LEADER = HOST1;
+
+ protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2};
+
+ protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments();
+ protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
+
+ /**
+ * Futures returned by schedule().
+ */
+ protected LinkedList<ScheduledFuture<?>> onceFutures;
+
+ /**
+ * Tasks captured via schedule().
+ */
+ protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
+
+ /**
+ * Futures returned by scheduleWithFixedDelay().
+ */
+ protected LinkedList<ScheduledFuture<?>> repeatedFutures;
+
+ /**
+ * Tasks captured via scheduleWithFixedDelay().
+ */
+ protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks;
+
+ /**
+ * Messages captured via publish().
+ */
+ protected LinkedList<Pair<String, Message>> published;
+
+ /**
+ * Messages captured via publishAdmin().
+ */
+ protected LinkedList<Message> admin;
+
+ protected PoolingManager mgr;
+ protected PoolingProperties props;
+ protected State prevState;
+
+ public BasicStateTester() {
+ super();
+ }
+
+ public void setUp() throws Exception {
+ onceFutures = new LinkedList<>();
+ onceTasks = new LinkedList<>();
+
+ repeatedFutures = new LinkedList<>();
+ repeatedTasks = new LinkedList<>();
+
+ published = new LinkedList<>();
+ admin = new LinkedList<>();
+
+ mgr = mock(PoolingManager.class);
+ props = mock(PoolingProperties.class);
+
+ when(mgr.getHost()).thenReturn(MY_HOST);
+ when(mgr.getTopic()).thenReturn(MY_TOPIC);
+ when(mgr.getProperties()).thenReturn(props);
+
+ when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
+ when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
+ when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
+ when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
+ when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
+
+ prevState = new State(mgr) {
+ @Override
+ public Map<String, Object> getFilter() {
+ throw new UnsupportedOperationException("cannot filter");
+ }
+ };
+
+ // capture publish() arguments
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ published.add(new Pair<>((String) args[0], (Message) args[1]));
+
+ return null;
+ }).when(mgr).publish(anyString(), any(Message.class));
+
+ // capture publishAdmin() arguments
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ admin.add((Message) args[0]);
+
+ return null;
+ }).when(mgr).publishAdmin(any(Message.class));
+
+ // capture schedule() arguments, and return a new future
+ when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1]));
+
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ onceFutures.add(fut);
+ return fut;
+ });
+
+ // capture scheduleWithFixedDelay() arguments, and return a new future
+ when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ repeatedTasks.add(new Triple<>((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
+
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ repeatedFutures.add(fut);
+ return fut;
+ });
+
+ // get/set assignments in the manager
+ AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3);
+
+ when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
+
+ doAnswer(args -> {
+ asgn.set(args.getArgumentAt(0, BucketAssignments.class));
+ return null;
+ }).when(mgr).startDistributing(any());
+ }
+
+ /**
+ * Makes a sorted set of hosts.
+ *
+ * @param hosts the hosts to be sorted
+ * @return the set of hosts, sorted
+ */
+ protected SortedSet<String> sortHosts(String... hosts) {
+ return new TreeSet<>(Arrays.asList(hosts));
+ }
+
+ /**
+ * Captures the host array from the Leader message published to the admin
+ * channel.
+ *
+ * @return the host array, as a list
+ */
+ protected List<String> captureHostList() {
+ return Arrays.asList(captureHostArray());
+ }
+
+ /**
+ * Captures the host array from the Leader message published to the admin
+ * channel.
+ *
+ * @return the host array
+ */
+ protected String[] captureHostArray() {
+ BucketAssignments asgn = captureAssignments();
+
+ String[] arr = asgn.getHostArray();
+ assertNotNull(arr);
+
+ return arr;
+ }
+
+ /**
+ * Captures the assignments from the Leader message published to the admin
+ * channel.
+ *
+ * @return the bucket assignments
+ */
+ protected BucketAssignments captureAssignments() {
+ Leader msg = captureAdminMessage(Leader.class);
+
+ BucketAssignments asgn = msg.getAssignments();
+ assertNotNull(asgn);
+ return asgn;
+ }
+
+ /**
+ * Captures the message published to the admin channel.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @return the message that was published
+ */
+ protected <T extends Message> T captureAdminMessage(Class<T> clazz) {
+ return captureAdminMessage(clazz, 0);
+ }
+
+ /**
+ * Captures the message published to the admin channel.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @param index index of the item to be captured
+ * @return the message that was published
+ */
+ protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) {
+ return clazz.cast(admin.get(index));
+ }
+
+ /**
+ * Captures the message published to the non-admin channels.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @return the (channel,message) pair that was published
+ */
+ protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) {
+ return capturePublishedMessage(clazz, 0);
+ }
+
+ /**
+ * Captures the message published to the non-admin channels.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @param index index of the item to be captured
+ * @return the (channel,message) pair that was published
+ */
+ protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
+ Pair<String, Message> msg = published.get(index);
+ return new Pair<>(msg.first, clazz.cast(msg.second));
+ }
+
+ /**
+ * Pair of values.
+ *
+ * @param <F> first value's type
+ * @param <S> second value's type
+ */
+ public static class Pair<F, S> {
+ public final F first;
+ public final S second;
+
+ public Pair(F first, S second) {
+ this.first = first;
+ this.second = second;
+ }
+ }
+
+ /**
+ * Pair of values.
+ *
+ * @param <F> first value's type
+ * @param <S> second value's type
+ * @param <T> third value's type
+ */
+ public static class Triple<F, S, T> {
+ public final F first;
+ public final S second;
+ public final T third;
+
+ public Triple(F first, S second, T third) {
+ this.first = first;
+ this.second = second;
+ this.third = third;
+ }
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java
new file mode 100644
index 00000000..ba517194
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_AND;
+import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_EQUALS;
+import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_OR;
+import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_CLASS;
+import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FIELD;
+import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FILTERS;
+import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_VALUE;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
+import java.util.Map;
+import org.junit.Test;
+
+public class FilterUtilsTest {
+
+ @Test
+ public void testMakeEquals() {
+ checkEquals("abc", "def", makeEquals("abc", "def"));
+ }
+
+ @Test
+ public void testMakeAnd() {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> filter =
+ makeAnd(makeEquals("an1", "av1"), makeEquals("an2", "av2"), makeEquals("an3", "av3"));
+
+ checkArray(CLASS_AND, 3, filter);
+ checkEquals("an1", "av1", getItem(filter, 0));
+ checkEquals("an2", "av2", getItem(filter, 1));
+ checkEquals("an3", "av3", getItem(filter, 2));
+ }
+
+ @Test
+ public void testMakeOr() {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> filter =
+ makeOr(makeEquals("on1", "ov1"), makeEquals("on2", "ov2"), makeEquals("on3", "ov3"));
+
+ checkArray(CLASS_OR, 3, filter);
+ checkEquals("on1", "ov1", getItem(filter, 0));
+ checkEquals("on2", "ov2", getItem(filter, 1));
+ checkEquals("on3", "ov3", getItem(filter, 2));
+ }
+
+ /**
+ * Checks that the filter contains an array.
+ *
+ * @param expectedClassName type of filter this should represent
+ * @param expectedCount number of items expected in the array
+ * @param filter filter to be examined
+ */
+ protected void checkArray(String expectedClassName, int expectedCount, Map<String, Object> filter) {
+ assertEquals(expectedClassName, filter.get(JSON_CLASS));
+
+ Object[] val = (Object[]) filter.get(JSON_FILTERS);
+ assertEquals(expectedCount, val.length);
+ }
+
+ /**
+ * Checks that a map represents an "equals".
+ *
+ * @param name name of the field on the left side of the equals
+ * @param value value on the right side of the equals
+ * @param map map whose content is to be examined
+ */
+ protected void checkEquals(String name, String value, Map<String, Object> map) {
+ assertEquals(CLASS_EQUALS, map.get(JSON_CLASS));
+ assertEquals(name, map.get(JSON_FIELD));
+ assertEquals(value, map.get(JSON_VALUE));
+ }
+
+ /**
+ * Gets a particular sub-filter from the array contained within a filter.
+ *
+ * @param filter containing filter
+ * @param index index of the sub-filter of interest
+ * @return the sub-filter with the given index
+ */
+ @SuppressWarnings("unchecked")
+ protected Map<String, Object> getItem(Map<String, Object> filter, int index) {
+ Object[] val = (Object[]) filter.get(JSON_FILTERS);
+
+ return (Map<String, Object>) val[index];
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
new file mode 100644
index 00000000..96c59719
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
@@ -0,0 +1,121 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class IdleStateTest extends BasicStateTester {
+
+ private IdleState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new IdleState(mgr);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testStop() {
+ state.stop();
+ verifyNothingPublished();
+ }
+
+ @Test
+ public void testProcessForward() {
+ Forward msg = new Forward();
+ assertNull(state.process(msg));
+
+ verify(mgr).handle(msg);
+ }
+
+ @Test
+ public void testProcessHeartbeat() {
+ assertNull(state.process(new Heartbeat(PREV_HOST, 0L)));
+ verifyNothingPublished();
+ }
+
+ @Test
+ public void testProcessIdentification() {
+ assertNull(state.process(new Identification(PREV_HOST, null)));
+ verifyNothingPublished();
+ }
+
+ @Test
+ public void testProcessLeader() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, PREV_HOST, MY_HOST});
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should stay in current state, but start distributing
+ assertNull(state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ public void testProcessOffline() {
+ assertNull(state.process(new Offline(PREV_HOST)));
+ verifyNothingPublished();
+ }
+
+ @Test
+ public void testProcessQuery() {
+ assertNull(state.process(new Query()));
+ verifyNothingPublished();
+ }
+
+ /**
+ * Verifies that nothing was published on either channel.
+ */
+ private void verifyNothingPublished() {
+ verify(mgr, never()).publish(any(), any());
+ verify(mgr, never()).publishAdmin(any());
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
new file mode 100644
index 00000000..48d5b1ed
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.Message;
+
+public class InactiveStateTest extends BasicStateTester {
+
+ private InactiveState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new InactiveState(mgr);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testGoInatcive() {
+ assertNull(state.goInactive());
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_REACTIVATE_WAIT_MS, timer.first.longValue());
+
+ // invoke the task - it should go to the state returned by the mgr
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+ }
+
+ @Test
+ public void testInactiveState() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
new file mode 100644
index 00000000..d60ad2ea
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
@@ -0,0 +1,328 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class ProcessingStateTest extends BasicStateTester {
+
+ private ProcessingState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new ProcessingState(mgr, MY_HOST);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testProcessQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(new Query()));
+
+ Identification ident = captureAdminMessage(Identification.class);
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
+ public void testProcessingState() {
+ /*
+ * Null assignments should be OK.
+ */
+ when(mgr.getAssignments()).thenReturn(null);
+ state = new ProcessingState(mgr, LEADER);
+
+ /*
+ * Empty assignments should be OK.
+ */
+ when(mgr.getAssignments()).thenReturn(EMPTY_ASGN);
+ state = new ProcessingState(mgr, LEADER);
+ assertEquals(MY_HOST, state.getHost());
+ assertEquals(LEADER, state.getLeader());
+ assertEquals(EMPTY_ASGN, state.getAssignments());
+
+ /*
+ * Now try something with assignments.
+ */
+ when(mgr.getAssignments()).thenReturn(ASGN3);
+ state = new ProcessingState(mgr, LEADER);
+
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+
+ assertEquals(LEADER, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testProcessingState_NullLeader() {
+ when(mgr.getAssignments()).thenReturn(EMPTY_ASGN);
+ state = new ProcessingState(mgr, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testProcessingState_ZeroLengthHostArray() {
+ when(mgr.getAssignments()).thenReturn(new BucketAssignments(new String[] {}));
+ state = new ProcessingState(mgr, LEADER);
+ }
+
+ @Test
+ public void testMakeIdentification() {
+ Identification ident = state.makeIdentification();
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
+ public void testGetAssignments() {
+ // assignments from constructor
+ assertEquals(ASGN3, state.getAssignments());
+
+ // null assignments - no effect
+ state.setAssignments(null);
+ assertEquals(ASGN3, state.getAssignments());
+
+ // empty assignments
+ state.setAssignments(EMPTY_ASGN);
+ assertEquals(EMPTY_ASGN, state.getAssignments());
+
+ // non-empty assignments
+ state.setAssignments(ASGN3);
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testSetAssignments() {
+ state.setAssignments(null);
+ verify(mgr, never()).startDistributing(any());
+
+ state.setAssignments(ASGN3);
+ verify(mgr).startDistributing(ASGN3);
+ }
+
+ @Test
+ public void testGetLeader() {
+ // check value from constructor
+ assertEquals(MY_HOST, state.getLeader());
+
+ state.setLeader(HOST2);
+ assertEquals(HOST2, state.getLeader());
+
+ state.setLeader(HOST3);
+ assertEquals(HOST3, state.getLeader());
+ }
+
+ @Test
+ public void testSetLeader() {
+ state.setLeader(MY_HOST);
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetLeader_Null() {
+ state.setLeader(null);
+ }
+
+ @Test
+ public void testIsLeader() {
+ state.setLeader(MY_HOST);
+ assertTrue(state.isLeader());
+
+ state.setLeader(HOST2);
+ assertFalse(state.isLeader());
+ }
+
+ @Test
+ public void testBecomeLeader() {
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ assertEquals(next, state.becomeLeader(sortHosts(MY_HOST, HOST2)));
+
+ Leader msg = captureAdminMessage(Leader.class);
+
+ verify(mgr).startDistributing(msg.getAssignments());
+ verify(mgr).goActive();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBecomeLeader_NotFirstAlive() {
+ // alive list contains something before my host name
+ state.becomeLeader(sortHosts(PREV_HOST, MY_HOST));
+ }
+
+ @Test
+ public void testMakeLeader() throws Exception {
+ state.becomeLeader(sortHosts(MY_HOST, HOST2));
+
+ Leader msg = captureAdminMessage(Leader.class);
+
+ // need a channel before invoking checkValidity()
+ msg.setChannel(Message.ADMIN);
+
+ msg.checkValidity();
+
+ assertEquals(MY_HOST, msg.getSource());
+ assertNotNull(msg.getAssignments());
+ assertTrue(msg.getAssignments().hasAssignment(MY_HOST));
+ assertTrue(msg.getAssignments().hasAssignment(HOST2));
+
+ // this one wasn't in the list of hosts, so it should have been removed
+ assertFalse(msg.getAssignments().hasAssignment(HOST1));
+ }
+
+ @Test
+ public void testMakeAssignments() throws Exception {
+ state.becomeLeader(sortHosts(MY_HOST, HOST2));
+
+ captureAssignments().checkValidity();
+ }
+
+ @Test
+ public void testMakeBucketArray_NullAssignments() {
+ when(mgr.getAssignments()).thenReturn(null);
+ state = new ProcessingState(mgr, MY_HOST);
+ state.becomeLeader(sortHosts(MY_HOST));
+
+ String[] arr = captureHostArray();
+
+ assertEquals(BucketAssignments.MAX_BUCKETS, arr.length);
+
+ assertTrue(Arrays.asList(arr).stream().allMatch(host -> MY_HOST.equals(host)));
+ }
+
+ @Test
+ public void testMakeBucketArray_ZeroAssignments() {
+ // bucket assignment with a zero-length array
+ state.setAssignments(new BucketAssignments(new String[0]));
+
+ state.becomeLeader(sortHosts(MY_HOST));
+
+ String[] arr = captureHostArray();
+
+ assertEquals(BucketAssignments.MAX_BUCKETS, arr.length);
+
+ assertTrue(Arrays.asList(arr).stream().allMatch(host -> MY_HOST.equals(host)));
+ }
+
+ @Test
+ public void testMakeBucketArray() {
+ /*
+ * All hosts are still alive, so it should have the exact same
+ * assignments as it had to start.
+ */
+ state.setAssignments(ASGN3);
+ state.becomeLeader(sortHosts(HOST_ARR3));
+
+ String[] arr = captureHostArray();
+
+ assertTrue(arr != HOST_ARR3);
+ assertEquals(Arrays.asList(HOST_ARR3), Arrays.asList(arr));
+ }
+
+ @Test
+ public void testRemoveExcessHosts() {
+ /**
+ * All hosts are still alive, plus some others.
+ */
+ state.setAssignments(ASGN3);
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3, HOST4));
+
+ // assignments should be unchanged
+ assertEquals(Arrays.asList(HOST_ARR3), captureHostList());
+ }
+
+ @Test
+ public void testAddIndicesToHostBuckets() {
+ // some are null, some hosts are no longer alive
+ String[] asgn = {null, MY_HOST, HOST3, null, HOST4, HOST1, HOST2};
+
+ state.setAssignments(new BucketAssignments(asgn));
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2));
+
+ // every bucket should be assigned to one of the three hosts
+ String[] expected = {MY_HOST, MY_HOST, HOST1, HOST2, MY_HOST, HOST1, HOST2};
+ assertEquals(Arrays.asList(expected), captureHostList());
+ }
+
+ @Test
+ public void testAssignNullBuckets() {
+ /*
+ * Ensure buckets are assigned to the host with the fewest buckets.
+ */
+ String[] asgn = {MY_HOST, HOST1, MY_HOST, null, null, null, null, null, MY_HOST};
+
+ state.setAssignments(new BucketAssignments(asgn));
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2));
+
+ String[] expected = {MY_HOST, HOST1, MY_HOST, HOST2, HOST1, HOST2, HOST1, HOST2, MY_HOST};
+ assertEquals(Arrays.asList(expected), captureHostList());
+ }
+
+ @Test
+ public void testRebalanceBuckets() {
+ /**
+ * Some are very lopsided.
+ */
+ String[] asgn = {MY_HOST, HOST1, MY_HOST, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3};
+
+ state.setAssignments(new BucketAssignments(asgn));
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3));
+
+ String[] expected = {HOST2, HOST1, HOST3, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3};
+ assertEquals(Arrays.asList(expected), captureHostList());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
new file mode 100644
index 00000000..d714d5cc
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -0,0 +1,462 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class QueryStateTest extends BasicStateTester {
+
+ private QueryState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new QueryState(mgr);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
+ assertNotNull(timer.second);
+ }
+
+ @Test
+ public void testGoQuery() {
+ assertNull(state.process(new Query()));
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessIdentification_NullSource() {
+ assertNull(state.process(new Identification()));
+
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessIdentification_NewLeader() {
+ assertNull(state.process(new Identification(PREV_HOST, null)));
+
+ assertEquals(PREV_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessIdentification_NotNewLeader() {
+ assertNull(state.process(new Identification(HOST2, null)));
+
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessLeader_NullAssignment() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessLeader_NullSource() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(null, asgn);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessLeader_SourceIsNotAssignmentLeader() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(HOST2, asgn);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessLeader_EmptyAssignment() {
+ Leader msg = new Leader(PREV_HOST, new BucketAssignments());
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessLeader_BetterLeader() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_NotABetterLeader() {
+ // no assignments yet
+ mgr.startDistributing(null);
+ state = new QueryState(mgr);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ Leader msg = new Leader(HOST1, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // should stay in the same state
+ assertNull(state.process(msg));
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // should have started distributing
+ verify(mgr).startDistributing(asgn);
+
+ // this host should still be the leader
+ assertEquals(MY_HOST, state.getLeader());
+
+ // new assignments
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessOffline_NullHost() {
+ assertNull(state.process(new Offline()));
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessOffline_SameHost() {
+ assertNull(state.process(new Offline(MY_HOST)));
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessOffline_DiffHost() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, HOST1});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ // tell it that the hosts are alive
+ state.process(new Identification(PREV_HOST, asgn));
+ state.process(new Identification(HOST1, asgn));
+
+ // #2 goes offline
+ assertNull(state.process(new Offline(HOST1)));
+
+ // #1 should still be the leader
+ assertEquals(PREV_HOST, state.getLeader());
+
+ // #1 goes offline
+ assertNull(state.process(new Offline(PREV_HOST)));
+
+ // this should still be the leader now
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ public void testProcessQuery() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(null, state.process(new Query()));
+
+ verify(mgr).publishAdmin(any(Identification.class));
+ }
+
+ @Test
+ public void testQueryState() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testAwaitIdentification_Leader() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
+ assertNotNull(timer.second);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+
+ // should have published a Leader message
+ Leader msg = captureAdminMessage(Leader.class);
+ assertEquals(MY_HOST, msg.getSource());
+ assertTrue(msg.getAssignments().hasAssignment(MY_HOST));
+ }
+
+ @Test
+ public void testAwaitIdentification_HasAssignment() {
+ // not the leader, but has an assignment
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ state.start();
+
+ // tell it the leader is still active
+ state.process(new Identification(PREV_HOST, asgn));
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
+ assertNotNull(timer.second);
+
+ // set up active state, as that's what it should return
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+
+ // should NOT have published a Leader message
+ assertTrue(admin.isEmpty());
+
+ // should have gone active with the current assignments
+ verify(mgr).goActive();
+ }
+
+ @Test
+ public void testAwaitIdentification_NoAssignment() {
+ // not the leader and no assignment
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ state.start();
+
+ // tell it the leader is still active
+ state.process(new Identification(PREV_HOST, asgn));
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
+ assertNotNull(timer.second);
+
+ // set up inactive state, as that's what it should return
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+
+ // should NOT have published a Leader message
+ assertTrue(admin.isEmpty());
+ }
+
+ @Test
+ public void testHasAssignment() {
+ // null assignment
+ mgr.startDistributing(null);
+ assertFalse(state.hasAssignment());
+
+ // not in assignments
+ state.setAssignments(new BucketAssignments(new String[] {HOST3}));
+ assertFalse(state.hasAssignment());
+
+ // it IS in the assignments
+ state.setAssignments(new BucketAssignments(new String[] {MY_HOST}));
+ assertTrue(state.hasAssignment());
+ }
+
+ @Test
+ public void testRecordInfo_NullSource() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ state.process(new Identification(null, asgn));
+
+ // leader unchanged
+ assertEquals(MY_HOST, state.getLeader());
+
+ // assignments still updated
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_SourcePreceedsMyHost() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ state.process(new Identification(PREV_HOST, asgn));
+
+ // new leader
+ assertEquals(PREV_HOST, state.getLeader());
+
+ // assignments still updated
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_SourceFollowsMyHost() {
+ mgr.startDistributing(null);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ state.process(new Identification(HOST1, asgn));
+
+ // leader unchanged
+ assertEquals(MY_HOST, state.getLeader());
+
+ // assignments still updated
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_NewIsNull() {
+ state.setAssignments(ASGN3);
+ state.process(new Identification(HOST1, null));
+
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_NewIsEmpty() {
+ state.setAssignments(ASGN3);
+ state.process(new Identification(PREV_HOST, new BucketAssignments()));
+
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_OldIsNull() {
+ mgr.startDistributing(null);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ state.process(new Identification(HOST1, asgn));
+
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_OldIsEmpty() {
+ state.setAssignments(new BucketAssignments());
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ state.process(new Identification(HOST1, asgn));
+
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_NewLeaderPreceedsOld() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ state.process(new Identification(HOST3, asgn));
+
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ public void testRecordInfo_NewLeaderSucceedsOld() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, HOST3});
+ state.process(new Identification(HOST3, asgn));
+
+ // should be unchanged
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
new file mode 100644
index 00000000..f29d2348
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -0,0 +1,180 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class StartStateTest extends BasicStateTester {
+
+ private StartState state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new StartState(mgr);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+
+ // get the sub-filter
+ filter = utils.getItem(filter, 1);
+
+ utils.checkArray(FilterUtils.CLASS_AND, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_TIMESTAMP, String.valueOf(state.getHbTimestampMs()),
+ utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+
+ assertEquals(MY_HOST, msg.first);
+ assertEquals(state.getHbTimestampMs(), msg.second.getTimestampMs());
+
+ Pair<Long, StateTimerTask> timer = onceTasks.removeFirst();
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first.longValue());
+
+ // invoke the task - it should go to the state returned by the mgr
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ assertEquals(next, timer.second.fire(null));
+
+ verify(mgr).internalTopicFailed();
+ }
+
+ @Test
+ public void testStartStatePoolingManager() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testStartStateState() {
+ // create a new state from the current state
+ state = new StartState(mgr);
+
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testProcessForward() {
+ assertNull(state.process(new Forward()));
+ }
+
+ @Test
+ public void testProcessHeartbeat() {
+ Heartbeat msg = new Heartbeat();
+
+ // no matching data in heart beat
+ assertNull(state.process(msg));
+ verify(mgr, never()).publishAdmin(any());
+
+ // same source, different time stamp
+ msg.setSource(MY_HOST);
+ msg.setTimestampMs(state.getHbTimestampMs() - 1);
+ assertNull(state.process(msg));
+ verify(mgr, never()).publishAdmin(any());
+
+ // same time stamp, different source
+ msg.setSource("unknown");
+ msg.setTimestampMs(state.getHbTimestampMs());
+ assertNull(state.process(msg));
+ verify(mgr, never()).publishAdmin(any());
+
+ // matching heart beat
+ msg.setSource(MY_HOST);
+ msg.setTimestampMs(state.getHbTimestampMs());
+
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(msg));
+
+ verify(mgr).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ public void testProcessIdentification() {
+ assertNull(state.process(new Identification(MY_HOST, null)));
+ }
+
+ @Test
+ public void testProcessLeader() {
+ assertNull(state.process(new Leader(MY_HOST, null)));
+ }
+
+ @Test
+ public void testProcessOffline() {
+ assertNull(state.process(new Offline(HOST1)));
+ }
+
+ @Test
+ public void testProcessQuery() {
+ assertNull(state.process(new Query()));
+ }
+
+ @Test
+ public void testGetHbTimestampMs() {
+ long tcurrent = System.currentTimeMillis();
+ assertTrue(new StartState(mgr).getHbTimestampMs() >= tcurrent);
+
+ tcurrent = System.currentTimeMillis();
+ assertTrue(new StartState(mgr).getHbTimestampMs() >= tcurrent);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
new file mode 100644
index 00000000..1be48e21
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -0,0 +1,440 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+public class StateTest extends BasicStateTester {
+
+ private State state;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new MyState(mgr);
+ }
+
+ @Test
+ public void testStatePoolingManager() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testStateState() {
+ // allocate a new state, copying from the old state
+ state = new MyState(mgr);
+
+ /*
+ * Prove the state is attached to the manager by invoking getHost(),
+ * which delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testCancelTimers() {
+ int delay = 100;
+ int initDelay = 200;
+
+ /*
+ * Create three tasks tasks.
+ */
+
+ StateTimerTask task1 = mock(StateTimerTask.class);
+ StateTimerTask task2 = mock(StateTimerTask.class);
+ StateTimerTask task3 = mock(StateTimerTask.class);
+
+ // two tasks via schedule()
+ state.schedule(delay, task1);
+ state.schedule(delay, task2);
+
+ // one task via scheduleWithFixedDelay()
+ state.scheduleWithFixedDelay(initDelay, delay, task3);
+
+ // ensure all were scheduled, but not yet canceled
+ verify(mgr).schedule(delay, task1);
+ verify(mgr).schedule(delay, task2);
+ verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3);
+
+ ScheduledFuture<?> fut1 = onceFutures.removeFirst();
+ ScheduledFuture<?> fut2 = onceFutures.removeFirst();
+ ScheduledFuture<?> fut3 = repeatedFutures.removeFirst();
+
+ verify(fut1, never()).cancel(false);
+ verify(fut2, never()).cancel(false);
+ verify(fut3, never()).cancel(false);
+
+ /*
+ * Cancel the timers.
+ */
+ state.cancelTimers();
+
+ // verify that all were cancelled
+ verify(fut1).cancel(false);
+ verify(fut2).cancel(false);
+ verify(fut3).cancel(false);
+ }
+
+ @Test
+ public void testGetFilter() {
+ Map<String, Object> filter = state.getFilter();
+
+ FilterUtilsTest utils = new FilterUtilsTest();
+
+ utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
+ utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
+ }
+
+ @Test
+ public void testStart() {
+ state.start();
+ }
+
+ @Test
+ public void testStop() {
+ state.stop();
+
+ assertEquals(MY_HOST, captureAdminMessage(Offline.class).getSource());
+ }
+
+ @Test
+ public void testGoStart() {
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ State next2 = state.goStart();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ public void testGoQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ State next2 = state.goQuery();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ public void testGoActive() {
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ State next2 = state.goActive();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ public void testGoInactive() {
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ State next2 = state.goInactive();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ public void testProcessForward() {
+ Forward msg = new Forward();
+ assertNull(state.process(msg));
+
+ verify(mgr).handle(msg);
+ }
+
+ @Test
+ public void testProcessHeartbeat() {
+ assertNull(state.process(new Heartbeat()));
+ }
+
+ @Test
+ public void testProcessIdentification() {
+ assertNull(state.process(new Identification()));
+ }
+
+ @Test
+ public void testProcessLeader_NullAssignment() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_NullSource() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(null, asgn);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_EmptyAssignment() {
+ Leader msg = new Leader(PREV_HOST, new BucketAssignments());
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_MyHostAssigned() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessLeader_MyHostUnassigned() {
+ String[] arr = {HOST2, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(HOST1, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // should go Inactive and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ verify(mgr, never()).goActive();
+ }
+
+ @Test
+ public void testProcessOffline() {
+ assertNull(state.process(new Offline()));
+ }
+
+ @Test
+ public void testProcessQuery() {
+ assertNull(state.process(new Query()));
+ }
+
+ @Test
+ public void testPublishIdentification() {
+ Identification msg = new Identification();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ public void testPublishLeader() {
+ Leader msg = new Leader();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ public void testPublishOffline() {
+ Offline msg = new Offline();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ public void testPublishQuery() {
+ Query msg = new Query();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ public void testPublishStringForward() {
+ String chnl = "channelF";
+ Forward msg = new Forward();
+
+ state.publish(chnl, msg);
+
+ verify(mgr).publish(chnl, msg);
+ }
+
+ @Test
+ public void testPublishStringHeartbeat() {
+ String chnl = "channelH";
+ Heartbeat msg = new Heartbeat();
+
+ state.publish(chnl, msg);
+
+ verify(mgr).publish(chnl, msg);
+ }
+
+ @Test
+ public void testStartDistributing() {
+ BucketAssignments asgn = new BucketAssignments();
+ state.startDistributing(asgn);
+
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ public void testStartDistributing_NullAssignments() {
+ state.startDistributing(null);
+
+ verify(mgr, never()).startDistributing(any());
+ }
+
+ @Test
+ public void testSchedule() {
+ int delay = 100;
+
+ StateTimerTask task = mock(StateTimerTask.class);
+
+ state.schedule(delay, task);
+
+ ScheduledFuture<?> fut = onceFutures.removeFirst();
+
+ // scheduled, but not canceled yet
+ verify(mgr).schedule(delay, task);
+ verify(fut, never()).cancel(false);
+
+ /*
+ * Ensure the state added the timer to its list by telling it to cancel
+ * its timers and then seeing if this timer was canceled.
+ */
+ state.cancelTimers();
+ verify(fut).cancel(false);
+ }
+
+ @Test
+ public void testScheduleWithFixedDelay() {
+ int initdel = 100;
+ int delay = 200;
+
+ StateTimerTask task = mock(StateTimerTask.class);
+
+ state.scheduleWithFixedDelay(initdel, delay, task);
+
+ ScheduledFuture<?> fut = repeatedFutures.removeFirst();
+
+ // scheduled, but not canceled yet
+ verify(mgr).scheduleWithFixedDelay(initdel, delay, task);
+ verify(fut, never()).cancel(false);
+
+ /*
+ * Ensure the state added the timer to its list by telling it to cancel
+ * its timers and then seeing if this timer was canceled.
+ */
+ state.cancelTimers();
+ verify(fut).cancel(false);
+ }
+
+ @Test
+ public void testInternalTopicFailed() {
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ State next2 = state.internalTopicFailed();
+ assertEquals(next, next2);
+
+ verify(mgr).internalTopicFailed();
+
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ public void testMakeHeartbeat() {
+ long timestamp = 30000L;
+ Heartbeat msg = state.makeHeartbeat(timestamp);
+
+ assertEquals(MY_HOST, msg.getSource());
+ assertEquals(timestamp, msg.getTimestampMs());
+ }
+
+ @Test
+ public void testMakeOffline() {
+ Offline msg = state.makeOffline();
+
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ public void testMakeQuery() {
+ Query msg = state.makeQuery();
+
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ public void testGetHost() {
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ public void testGetTopic() {
+ assertEquals(MY_TOPIC, state.getTopic());
+ }
+
+ /**
+ * State used for testing purposes, with abstract methods implemented.
+ */
+ private class MyState extends State {
+
+ public MyState(PoolingManager mgr) {
+ super(mgr);
+ }
+ }
+}