summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java')
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java213
1 files changed, 82 insertions, 131 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index 13d70f52..cc588384 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -26,13 +26,13 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
import java.io.IOException;
import java.util.Arrays;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
@@ -43,13 +43,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -71,7 +69,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
/**
* End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
- * its own feature object.
+ * its own feature object. Uses real feature objects. However, the following are not:
+ * <dl>
+ * <dt>DMaaP sources and sinks</dt>
+ * <dd>simulated using queues. There is one queue for the external topic, and one queue
+ * for each host's internal topic. Messages published to the "admin" channel are simply
+ * sent to all of the hosts' internal topic queues</dd>
+ * <dt>PolicyEngine, PolicyController, DroolsController</dt>
+ * <dd>mocked</dd>
+ * </dl>
*/
public class FeatureTest {
@@ -92,26 +98,15 @@ public class FeatureTest {
*/
private static final String CONTROLLER1 = "controller.one";
- // private static final long STD_HEARTBEAT_WAIT_MS = 100;
- // private static final long STD_REACTIVATE_WAIT_MS = 200;
- // private static final long STD_IDENTIFICATION_MS = 60;
- // private static final long STD_ACTIVE_HEARTBEAT_MS = 5;
- // private static final long STD_INTER_HEARTBEAT_MS = 50;
- // private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
- // private static final long POLL_MS = 2;
- // private static final long INTER_POLL_MS = 2;
- // private static final long EVENT_WAIT_SEC = 5;
-
- // use these to slow things down
- private static final long STD_HEARTBEAT_WAIT_MS = 5000;
- private static final long STD_REACTIVATE_WAIT_MS = 10000;
- private static final long STD_IDENTIFICATION_MS = 10000;
- private static final long STD_ACTIVE_HEARTBEAT_MS = 5000;
- private static final long STD_INTER_HEARTBEAT_MS = 12000;
- private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
- private static final long POLL_MS = 2;
- private static final long INTER_POLL_MS = 2000;
- private static final long EVENT_WAIT_SEC = 1000;
+ private static long stdReactivateWaitMs = 200;
+ private static long stdIdentificationMs = 60;
+ private static long stdStartHeartbeatMs = 60;
+ private static long stdActiveHeartbeatMs = 50;
+ private static long stdInterHeartbeatMs = 5;
+ private static long stdOfflinePubWaitMs = 2;
+ private static long stdPollMs = 2;
+ private static long stdInterPollMs = 2;
+ private static long stdEventWaitSec = 10;
// these are saved and restored on exit from this test class
private static PoolingFeature.Factory saveFeatureFactory;
@@ -128,6 +123,8 @@ public class FeatureTest {
saveFeatureFactory = PoolingFeature.getFactory();
saveManagerFactory = PoolingManagerImpl.getFactory();
saveDmaapFactory = DmaapManager.getFactory();
+
+ // note: invoke runSlow() to slow things down
}
@AfterClass
@@ -149,51 +146,35 @@ public class FeatureTest {
}
}
- @Ignore
@Test
public void test_SingleHost() throws Exception {
- int nmessages = 70;
-
- ctx = new Context(nmessages);
-
- ctx.addHost();
- ctx.startHosts();
-
- for (int x = 0; x < nmessages; ++x) {
- ctx.offerExternal(makeMessage(x));
- }
-
- ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
-
- assertEquals(0, ctx.getDecodeErrors());
- assertEquals(0, ctx.getRemainingEvents());
- ctx.checkAllSawAMsg();
+ run(70, 1);
}
- @Ignore
@Test
public void test_TwoHosts() throws Exception {
- int nmessages = 200;
+ run(200, 2);
+ }
+
+ @Test
+ public void test_ThreeHosts() throws Exception {
+ run(200, 3);
+ }
+ private void run(int nmessages, int nhosts) throws Exception {
ctx = new Context(nmessages);
- ctx.addHost();
- ctx.addHost();
+ for (int x = 0; x < nhosts; ++x) {
+ ctx.addHost();
+ }
+
ctx.startHosts();
for (int x = 0; x < nmessages; ++x) {
ctx.offerExternal(makeMessage(x));
}
- // wait for all hosts to have time to process a few messages
- Thread.sleep(STD_ACTIVE_HEARTBEAT_MS + INTER_POLL_MS * 3);
-
- // pause a topic for a bit
-// ctx.pauseTopic();
-
- // now we'll see if it recovers
-
- ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+ ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
assertEquals(0, ctx.getDecodeErrors());
assertEquals(0, ctx.getRemainingEvents());
@@ -203,6 +184,21 @@ public class FeatureTest {
private String makeMessage(int reqnum) {
return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
}
+
+ /**
+ * Invoke this to slow the timers down.
+ */
+ protected static void runSlow() {
+ stdReactivateWaitMs = 10000;
+ stdIdentificationMs = 10000;
+ stdStartHeartbeatMs = 15000;
+ stdActiveHeartbeatMs = 12000;
+ stdInterHeartbeatMs = 5000;
+ stdOfflinePubWaitMs = 2;
+ stdPollMs = 2;
+ stdInterPollMs = 2000;
+ stdEventWaitSec = 1000;
+ }
/**
* Context used for a single test case.
@@ -244,12 +240,6 @@ public class FeatureTest {
private final CountDownLatch eventCounter;
/**
- * Maps host name to its topic source. This must be in sorted order so we can
- * identify the source for the host with the higher name.
- */
- private TreeMap<String, TopicSourceImpl> host2topic = new TreeMap<>();
-
- /**
* The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
* {@link #getCurrentHost()}.
*/
@@ -280,9 +270,14 @@ public class FeatureTest {
/**
* Creates and adds a new host to the context.
+ *
+ * @return the new Host
*/
- public void addHost() {
- hosts.add(new Host(this));
+ public Host addHost() {
+ Host host = new Host(this);
+ hosts.add(host);
+
+ return host;
}
/**
@@ -443,26 +438,6 @@ public class FeatureTest {
}
/**
- * Associates a host with a topic.
- *
- * @param host
- * @param topic
- */
- public void addTopicSource(String host, TopicSourceImpl topic) {
- host2topic.put(host, topic);
- }
-
- /**
- * Pauses the last topic source long enough to miss a heart beat.
- */
- public void pauseTopic() {
- Entry<String, TopicSourceImpl> ent = host2topic.lastEntry();
- if (ent != null) {
- ent.getValue().pause(STD_ACTIVE_HEARTBEAT_MS);
- }
- }
-
- /**
* Gets the current host, provided this is used from within a call to
* {@link #withHost(Host, VoidFunction)}.
*
@@ -527,12 +502,10 @@ public class FeatureTest {
}
/**
- * Gets the host name. This should only be invoked within {@link #start()}.
- *
* @return the host name
*/
public String getName() {
- return PoolingManagerImpl.getLastHost();
+ return feature.getHost();
}
/**
@@ -759,11 +732,6 @@ public class FeatureTest {
private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
/**
- * Time, in milliseconds, to pause before polling for more messages.
- */
- private AtomicLong pauseTimeMs = new AtomicLong(0);
-
- /**
*
* @param context
* @param internal {@code true} if to read from the internal topic, {@code false}
@@ -771,12 +739,8 @@ public class FeatureTest {
*/
public TopicSourceImpl(Context context, boolean internal) {
if (internal) {
- Host host = context.getCurrentHost();
-
this.topic = INTERNAL_TOPIC;
- this.queue = host.getInternalQueue();
-
- context.addTopicSource(host.getName(), this);
+ this.queue = context.getCurrentHost().getInternalQueue();
} else {
this.topic = EXTERNAL_TOPIC;
@@ -809,11 +773,12 @@ public class FeatureTest {
reregister(newPair);
- new Thread(() -> {
+ Thread thread = new Thread(() -> {
+
try {
do {
processMessages(newPair.first(), listener);
- } while (!newPair.first().await(INTER_POLL_MS, TimeUnit.MILLISECONDS));
+ } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
logger.info("topic source thread completed");
@@ -827,7 +792,10 @@ public class FeatureTest {
newPair.second().countDown();
- }).start();
+ });
+
+ thread.setDaemon(true);
+ thread.start();
}
/**
@@ -879,19 +847,7 @@ public class FeatureTest {
}
/**
- * Indicates that {@link #processMessages(CountDownLatch, TopicListener)} should
- * pause a bit.
- *
- * @param timeMs time, in milliseconds, to pause
- */
- public void pause(long timeMs) {
- pauseTimeMs.set(timeMs);
- }
-
- /**
- * Polls for messages from the topic and offers them to the listener. If
- * {@link #pauseTimeMs} is non-zero, then it pauses for the specified time and
- * then immediately returns.
+ * Polls for messages from the topic and offers them to the listener.
*
* @param stopped triggered if processing should stop
* @param listener
@@ -901,14 +857,7 @@ public class FeatureTest {
for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
- long ptm = pauseTimeMs.getAndSet(0);
- if (ptm != 0) {
- logger.warn("pause processing");
- stopped.await(ptm, TimeUnit.MILLISECONDS);
- return;
- }
-
- String msg = queue.poll(POLL_MS, TimeUnit.MILLISECONDS);
+ String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
if (msg == null) {
return;
}
@@ -1038,18 +987,20 @@ public class FeatureTest {
props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
- props.setProperty("pooling." + CONTROLLER1 + ".topic", INTERNAL_TOPIC);
- props.setProperty("pooling." + CONTROLLER1 + ".enabled", "true");
- props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.limit", "10000");
- props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.age.milliseconds", "1000000");
- props.setProperty("pooling." + CONTROLLER1 + ".start.heartbeat.milliseconds", "" + STD_HEARTBEAT_WAIT_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".reactivate.milliseconds", "" + STD_REACTIVATE_WAIT_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".identification.milliseconds", "" + STD_IDENTIFICATION_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".active.heartbeat.milliseconds",
- "" + STD_ACTIVE_HEARTBEAT_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".inter.heartbeat.milliseconds", "" + STD_INTER_HEARTBEAT_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".offline.publish.wait.milliseconds",
- "" + STD_OFFLINE_PUB_WAIT_MS);
+ props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
+ props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
+ props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
+ "" + stdOfflinePubWaitMs);
+ props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
+ "" + stdStartHeartbeatMs);
+ props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
+ props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
+ props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
+ "" + stdActiveHeartbeatMs);
+ props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
+ "" + stdInterHeartbeatMs);
return props;
}