summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java1
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java36
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java136
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java28
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java12
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java4
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java15
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java15
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java24
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java25
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java16
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java33
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java23
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java213
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java734
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java18
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java84
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java28
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java28
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java26
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java43
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java40
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java21
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java29
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java67
25 files changed, 1286 insertions, 413 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 102eda75..abb4da33 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -202,6 +202,7 @@ public class DmaapManager {
} catch (InterruptedException e) {
logger.warn("message transmission stopped due to {}", e.getMessage());
+ Thread.currentThread().interrupt();
}
try {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 21cbc4db..1e2071ab 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -21,7 +21,9 @@
package org.onap.policy.drools.pooling;
import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
@@ -54,6 +56,11 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
private static Factory factory;
/**
+ * ID of this host.
+ */
+ private final String host;
+
+ /**
* Entire set of feature properties, including those specific to various controllers.
*/
private Properties featProps = null;
@@ -61,7 +68,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
* Maps a controller name to its associated manager.
*/
- private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+ private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+
+ /**
+ * Decremented each time a manager enters the Active state. Used by junit tests.
+ */
+ private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
* Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
@@ -75,6 +87,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
*/
public PoolingFeature() {
super();
+
+ this.host = UUID.randomUUID().toString();
}
protected static Factory getFactory() {
@@ -90,6 +104,17 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
PoolingFeature.factory = factory;
}
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * @return a latch that will be decremented when a manager enters the active state
+ */
+ protected CountDownLatch getActiveLatch() {
+ return activeLatch;
+ }
+
@Override
public int getSequenceNumber() {
return 0;
@@ -123,7 +148,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
PoolingProperties props = new PoolingProperties(name, featProps);
logger.info("pooling enabled for {}", name);
- ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props));
+ ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch));
} catch (PropertyException e) {
logger.error("pooling disabled due to exception for {}", name, e);
@@ -371,12 +396,15 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
* Makes a pooling manager for a controller.
*
+ * @param host name/uuid of this host
* @param controller
* @param props properties to use to configure the manager
+ * @param activeLatch decremented when the manager goes Active
* @return a new pooling manager
*/
- public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) {
- return new PoolingManagerImpl(controller, props);
+ public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+ return new PoolingManagerImpl(host, controller, props, activeLatch);
}
/**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 422efdd7..de25e471 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -25,12 +25,10 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.event.comm.TopicListener;
@@ -47,6 +45,7 @@ import org.onap.policy.drools.pooling.state.QueryState;
import org.onap.policy.drools.pooling.state.StartState;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
+import org.onap.policy.drools.properties.PolicyProperties;
import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
import org.onap.policy.drools.system.PolicyController;
import org.slf4j.Logger;
@@ -75,11 +74,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private static Factory factory = new Factory();
/**
- * ID of the last host that was created.
- */
- private static final AtomicReference<String> lastHost = new AtomicReference<>(null);
-
- /**
* ID of this host.
*/
private final String host;
@@ -100,6 +94,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final TopicListener listener;
/**
+ * Decremented each time the manager enters the Active state. Used by junit tests.
+ */
+ private final CountDownLatch activeLatch;
+
+ /**
* Used to encode & decode request objects received from & sent to a rule engine.
*/
private final Serializer serializer;
@@ -148,7 +147,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Queue used when no bucket assignments are available.
*/
- private EventQueue eventq;
+ private final EventQueue eventq;
/**
* {@code True} if events offered by the controller should be intercepted,
@@ -158,28 +157,28 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Constructs the manager, initializing all of the data structures.
- *
+ *
+ * @param host name/uuid of this host
* @param controller controller with which this is associated
* @param props feature properties specific to the controller
+ * @param activeLatch latch to be decremented each time the manager enters the Active
+ * state
*/
- public PoolingManagerImpl(PolicyController controller, PoolingProperties props) {
- this.host = UUID.randomUUID().toString();
+ public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+ this.host = host;
this.controller = controller;
this.props = props;
-
- lastHost.set(this.host);
+ this.activeLatch = activeLatch;
try {
this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
this.eventq = factory.makeEventQueue(props);
-
- SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(),
- props.getSource());
- this.extractors = factory.makeClassExtractors(spec);
-
- this.dmaapMgr = factory.makeDmaapManager(props);
+ this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
+ this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(),
+ makeDmaapProps(controller, props.getSource()));
this.current = new IdleState(this);
logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
@@ -203,16 +202,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Used by junit tests.
- *
- * @return the ID of the last host that was created, or {@code null} if no hosts have
- * been created yet
- */
- protected static String getLastHost() {
- return lastHost.get();
- }
-
- /**
* Should only be used by junit tests.
*
* @return the current state
@@ -237,6 +226,50 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
+ * Makes properties for configuring extractors.
+ *
+ * @param controller the controller for which the extractors will be configured
+ * @param source properties from which to get the extractor properties
+ * @return extractor properties
+ */
+ private Properties makeExtractorProps(PolicyController controller, Properties source) {
+ return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
+ }
+
+ /**
+ * Makes properties for configuring DMaaP. Copies properties from the source that
+ * start with the Pooling property prefix followed by the controller name, stripping
+ * the prefix and controller name.
+ *
+ * @param controller the controller for which DMaaP will be configured
+ * @param source properties from which to get the DMaaP properties
+ * @return DMaaP properties
+ */
+ private Properties makeDmaapProps(PolicyController controller, Properties source) {
+ SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source);
+
+ // could be UEB or DMAAP, so add both
+ addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
+ addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+
+ return specProps;
+ }
+
+ /**
+ * Adds DMaaP consumer properties, consumer group & instance. The group is the host
+ * and the instance is a constant.
+ *
+ * @param props where to add the new properties
+ * @param prefix property prefix
+ */
+ private void addDmaapConsumerProps(SpecProperties props, String prefix) {
+ String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
+
+ props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
+ props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
+ }
+
+ /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*
@@ -288,8 +321,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
scheduler = null;
if (!(current instanceof IdleState)) {
- dmaapMgr.stopConsumer(this);
changeState(new IdleState(this));
+ dmaapMgr.stopConsumer(this);
publishAdmin(new Offline(getHost()));
}
}
@@ -405,12 +438,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
// wrap the future in a "CancellableScheduledTask"
- return new CancellableScheduledTask() {
- @Override
- public void cancel() {
- fut.cancel(false);
- }
- };
+ return () -> fut.cancel(false);
}
@Override
@@ -420,12 +448,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
TimeUnit.MILLISECONDS);
// wrap the future in a "CancellableScheduledTask"
- return new CancellableScheduledTask() {
- @Override
- public void cancel() {
- fut.cancel(false);
- }
- };
+ return () -> fut.cancel(false);
}
@Override
@@ -609,7 +632,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
topic);
} else {
- logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
+ logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
event.bumpNumHops();
publish(target, event);
}
@@ -751,26 +774,27 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
@Override
public CountDownLatch startDistributing(BucketAssignments asgn) {
- if (asgn == null) {
- return null;
- }
-
- logger.info("new assignments for topic {}", getTopic());
-
synchronized (curLocker) {
+ int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
+ logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
assignments = asgn;
}
+ if (asgn == null) {
+ return null;
+ }
+
/*
* publish the events from the event queue, but do it in a background thread so
* that the state machine can enter its correct state BEFORE we start processing
* the events
*/
CountDownLatch latch = new CountDownLatch(1);
-
+
new Thread(() -> {
synchronized (curLocker) {
if (assignments == null) {
+ latch.countDown();
return;
}
@@ -779,11 +803,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
while ((ev = eventq.poll()) != null) {
handle(ev);
}
-
+
latch.countDown();
}
}).start();
-
+
return latch;
}
@@ -804,6 +828,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
@Override
public State goActive() {
+ activeLatch.countDown();
return new ActiveState(this);
}
@@ -876,12 +901,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Creates a DMaaP manager.
*
- * @param props properties used to configure the manager
+ * @param topic name of the internal DMaaP topic
+ * @param props properties used to configure DMaaP
* @return a new DMaaP manager
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException {
- return new DmaapManager(props.getPoolingTopic(), props.getSource());
+ public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException {
+ return new DmaapManager(topic, props);
}
/**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
index 4e8de0d0..54319423 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
@@ -51,12 +51,12 @@ public class PoolingProperties extends SpecPropertyConfiguration {
public static final String FEATURE_ENABLED = PREFIX + "{?.}enabled";
public static final String OFFLINE_LIMIT = PREFIX + "{?.}offline.queue.limit";
public static final String OFFLINE_AGE_MS = PREFIX + "{?.}offline.queue.age.milliseconds";
+ public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds";
public static final String START_HEARTBEAT_MS = PREFIX + "{?.}start.heartbeat.milliseconds";
public static final String REACTIVATE_MS = PREFIX + "{?.}reactivate.milliseconds";
public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds";
public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds";
public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds";
- public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds";
/**
* Type of item that the extractors will be extracting.
@@ -94,10 +94,17 @@ public class PoolingProperties extends SpecPropertyConfiguration {
private long offlineAgeMs;
/**
+ * Time, in milliseconds, to wait for an "Offline" message to be published
+ * to DMaaP.
+ */
+ @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
+ private long offlinePubWaitMs;
+
+ /**
* Time, in milliseconds, to wait for this host's heart beat during the
* start-up state.
*/
- @Property(name = START_HEARTBEAT_MS, defaultValue = "50000")
+ @Property(name = START_HEARTBEAT_MS, defaultValue = "100000")
private long startHeartbeatMs;
/**
@@ -123,19 +130,12 @@ public class PoolingProperties extends SpecPropertyConfiguration {
/**
* Time, in milliseconds, to wait between heart beat generations during
- * the active state.
+ * the active and start-up states.
*/
@Property(name = INTER_HEARTBEAT_MS, defaultValue = "15000")
private long interHeartbeatMs;
/**
- * Time, in milliseconds, to wait for an "Offline" message to be published
- * to DMaaP.
- */
- @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
- private long offlinePubWaitMs;
-
- /**
* @param controllerName the name of the controller
* @param props set of properties used to configure this
* @throws PropertyException if an error occurs
@@ -163,6 +163,10 @@ public class PoolingProperties extends SpecPropertyConfiguration {
return offlineAgeMs;
}
+ public long getOfflinePubWaitMs() {
+ return offlinePubWaitMs;
+ }
+
public long getStartHeartbeatMs() {
return startHeartbeatMs;
}
@@ -182,8 +186,4 @@ public class PoolingProperties extends SpecPropertyConfiguration {
public long getInterHeartbeatMs() {
return interHeartbeatMs;
}
-
- public long getOfflinePubWaitMs() {
- return offlinePubWaitMs;
- }
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
index 31b4e614..c831f706 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
@@ -40,7 +40,8 @@ public class SpecProperties extends Properties {
/**
*
- * @param prefix the property name prefix that appears before any specialization
+ * @param prefix the property name prefix that appears before any specialization, may
+ * be ""
* @param specialization the property name specialization (e.g., session name)
*/
public SpecProperties(String prefix, String specialization) {
@@ -50,7 +51,8 @@ public class SpecProperties extends Properties {
/**
*
- * @param prefix the property name prefix that appears before any specialization
+ * @param prefix the property name prefix that appears before any specialization, may
+ * be ""
* @param specialization the property name specialization (e.g., session name)
* @param props the default properties
*/
@@ -68,7 +70,7 @@ public class SpecProperties extends Properties {
* @return the text, with a trailing "."
*/
private static String withTrailingDot(String text) {
- return text.endsWith(".") ? text : text + ".";
+ return text.isEmpty() || text.endsWith(".") ? text : text + ".";
}
/**
@@ -105,11 +107,11 @@ public class SpecProperties extends Properties {
@Override
public final int hashCode() {
- throw new UnsupportedOperationException("HostBucket cannot be hashed");
+ throw new UnsupportedOperationException("SpecProperties cannot be hashed");
}
@Override
public final boolean equals(Object obj) {
- throw new UnsupportedOperationException("cannot compare HostBuckets");
+ throw new UnsupportedOperationException("cannot compare SpecProperties");
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
index 2752ca8c..ccca77be 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
@@ -374,7 +374,7 @@ public class ClassExtractors {
} catch (NoSuchMethodException expected) {
// no getXxx() method, maybe there's a field by this name
- logger.debug("no method {} in {}", nm, clazz.getName());
+ logger.debug("no method {} in {}", nm, clazz.getName(), expected);
return null;
} catch (SecurityException e) {
@@ -445,7 +445,7 @@ public class ClassExtractors {
} catch (NoSuchFieldException expected) {
// no field by this name - try super class & interfaces
- logger.debug("no field {} in {}", name, clazz.getName());
+ logger.debug("no field {} in {}", name, clazz.getName(), expected);
} catch (SecurityException e) {
throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
index b0a36cd9..8f0a902a 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
@@ -95,18 +95,19 @@ public class ActiveState extends ProcessingState {
if ((succHost = assigned.higher(getHost())) == null) {
// wrapped around - successor is the first host in the set
succHost = assigned.first();
- logger.info("this host's successor is {} on topic {}", succHost, getTopic());
}
+ logger.info("this host's successor is {} on topic {}", succHost, getTopic());
if ((predHost = assigned.lower(getHost())) == null) {
// wrapped around - predecessor is the last host in the set
predHost = assigned.last();
- logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
}
+ logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
}
@Override
public void start() {
+ super.start();
addTimers();
genHeartbeat();
}
@@ -120,7 +121,7 @@ public class ActiveState extends ProcessingState {
/*
* heart beat generator
*/
- long genMs = getProperties().getActiveHeartbeatMs();
+ long genMs = getProperties().getInterHeartbeatMs();
scheduleWithFixedDelay(genMs, genMs, () -> {
genHeartbeat();
@@ -130,9 +131,9 @@ public class ActiveState extends ProcessingState {
/*
* my heart beat checker
*/
- long interMs = getProperties().getInterHeartbeatMs();
+ long waitMs = getProperties().getActiveHeartbeatMs();
- scheduleWithFixedDelay(interMs, interMs, () -> {
+ scheduleWithFixedDelay(waitMs, waitMs, () -> {
if (myHeartbeatSeen) {
myHeartbeatSeen = false;
return null;
@@ -141,7 +142,7 @@ public class ActiveState extends ProcessingState {
// missed my heart beat
logger.error("missed my heartbeat on topic {}", getTopic());
- return internalTopicFailed();
+ return missedHeartbeat();
});
/*
@@ -149,7 +150,7 @@ public class ActiveState extends ProcessingState {
*/
if (!predHost.isEmpty()) {
- scheduleWithFixedDelay(interMs, interMs, () -> {
+ scheduleWithFixedDelay(waitMs, waitMs, () -> {
if (predHeartbeatSeen) {
predHeartbeatSeen = false;
return null;
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
index 6be2fb84..f717aa52 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -44,24 +44,17 @@ public class InactiveState extends State {
@Override
public void start() {
-
super.start();
-
- schedule(getProperties().getReactivateMs(), () -> goStart());
+ schedule(getProperties().getReactivateMs(), this::goStart);
}
@Override
public State process(Leader msg) {
- if(isValid(msg)) {
+ if (isValid(msg)) {
logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic());
- startDistributing(msg.getAssignments());
-
- if(msg.getAssignments().hasAssignment(getHost())) {
- logger.info("received Leader message on topic {}", getTopic());
- return goActive();
- }
+ return goActive(msg.getAssignments());
}
-
+
return null;
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
index 1e9bb581..e9dc0324 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
@@ -74,24 +74,6 @@ public class ProcessingState extends State {
}
/**
- * Goes active with a new set of assignments.
- *
- * @param asgn new assignments
- * @return the new state, either Active or Inactive, depending on whether or not this
- * host has an assignment
- */
- protected State goActive(BucketAssignments asgn) {
- startDistributing(asgn);
-
- if (asgn.hasAssignment(getHost())) {
- return goActive();
-
- } else {
- return goInactive();
- }
- }
-
- /**
* Generates an Identification message and goes to the query state.
*/
@Override
@@ -154,11 +136,11 @@ public class ProcessingState extends State {
}
Leader msg = makeLeader(alive);
- publish(msg);
+ logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());
- setAssignments(msg.getAssignments());
+ publish(msg);
- return goActive();
+ return goActive(msg.getAssignments());
}
/**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
index 9045165b..1a4da150 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
@@ -63,7 +63,6 @@ public class QueryState extends ProcessingState {
@Override
public void start() {
-
super.start();
// start identification timer
@@ -85,39 +84,21 @@ public class QueryState extends ProcessingState {
if (!sawSelfIdent) {
// didn't see our identification
logger.error("missed our own Ident message on topic {}", getTopic());
- return internalTopicFailed();
+ return missedHeartbeat();
} else if (isLeader()) {
// "this" host is the new leader
logger.info("this host is the new leader for topic {}", getTopic());
return becomeLeader(alive);
- } else if (hasAssignment()) {
- /*
- * this host is not the new leader, but it does have an assignment -
- * return to the active state while we wait for the leader
- */
- logger.info("no new leader on topic {}", getTopic());
- return goActive();
-
} else {
- // not the leader and no assignment yet
+ // not the leader - return to previous state
logger.info("no new leader on topic {}", getTopic());
- return goInactive();
+ return goActive(getAssignments());
}
});
}
- /**
- * Determines if this host has an assignment in the CURRENT assignments.
- *
- * @return {@code true} if this host has an assignment, {@code false} otherwise
- */
- protected boolean hasAssignment() {
- BucketAssignments asgn = getAssignments();
- return (asgn != null && asgn.hasAssignment(getHost()));
- }
-
@Override
public State goQuery() {
return null;
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
index a978e245..3068cfc9 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -67,8 +67,22 @@ public class StartState extends State {
super.start();
- publish(getHost(), makeHeartbeat(hbTimestampMs));
+ Heartbeat hb = makeHeartbeat(hbTimestampMs);
+ publish(getHost(), hb);
+ /*
+ * heart beat generator
+ */
+ long genMs = getProperties().getInterHeartbeatMs();
+
+ scheduleWithFixedDelay(genMs, genMs, () -> {
+ publish(getHost(), hb);
+ return null;
+ });
+
+ /*
+ * my heart beat checker
+ */
schedule(getProperties().getStartHeartbeatMs(), () -> {
logger.error("missed heartbeat on topic {}", getTopic());
return internalTopicFailed();
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index 421b9225..545c2ef0 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -113,12 +113,21 @@ public abstract class State {
}
/**
- * Transitions to the "active" state.
+ * Goes active with a new set of assignments.
*
- * @return the new state
+ * @param asgn new assignments
+ * @return the new state, either Active or Inactive, depending on whether or not this
+ * host has an assignment
*/
- public final State goActive() {
- return mgr.goActive();
+ protected State goActive(BucketAssignments asgn) {
+ startDistributing(asgn);
+
+ if (asgn != null && asgn.hasAssignment(getHost())) {
+ return mgr.goActive();
+
+ } else {
+ return goInactive();
+ }
}
/**
@@ -322,7 +331,21 @@ public abstract class State {
}
/**
- * Indicates that the internal topic failed.
+ * Indicates that we failed to see our own heartbeat; must be a problem with the
+ * internal topic.
+ *
+ * @return a new {@link StartState}
+ */
+ protected final State missedHeartbeat() {
+ publish(makeOffline());
+ mgr.startDistributing(null);
+
+ return mgr.goStart();
+ }
+
+ /**
+ * Indicates that the internal topic failed; this should only be invoked from the
+ * StartState.
*
* @return a new {@link InactiveState}
*/
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
index a91671fd..a5688df6 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -211,8 +212,8 @@ public class DmaapManagerTest {
// force exception when it starts
doThrow(new IllegalStateException("expected")).when(sink).start();
- expectException("startPublisher,start", xxx -> mgr.startPublisher());
- expectException("startPublisher,publish", xxx -> mgr.publish(MSG));
+ expectException("startPublisher,start", () -> mgr.startPublisher());
+ expectException("startPublisher,publish", () -> mgr.publish(MSG));
// allow it to succeed this time
reset(sink);
@@ -264,11 +265,15 @@ public class DmaapManagerTest {
long minms = 2000L;
// tell the publisher to stop in minms + additional time
- Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L));
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread thread = new Thread(() -> {
+ latch.countDown();
+ mgr.stopPublisher(minms + 3000L);
+ });
thread.start();
- // give the thread a chance to start
- Thread.sleep(50L);
+ // wait for the thread to start
+ latch.await();
// interrupt it - it should immediately finish its work
thread.interrupt();
@@ -336,7 +341,7 @@ public class DmaapManagerTest {
@Test
public void testPublish() throws PoolingFeatureException {
// cannot publish before starting
- expectException("publish,pre", xxx -> mgr.publish(MSG));
+ expectException("publish,pre", () -> mgr.publish(MSG));
mgr.startPublisher();
@@ -352,7 +357,7 @@ public class DmaapManagerTest {
// stop and verify we can no longer publish
mgr.stopPublisher(0);
- expectException("publish,stopped", xxx -> mgr.publish(MSG));
+ expectException("publish,stopped", () -> mgr.publish(MSG));
}
@Test(expected = PoolingFeatureException.class)
@@ -377,7 +382,7 @@ public class DmaapManagerTest {
private void expectException(String testnm, VFunction func) {
try {
- func.apply(null);
+ func.apply();
fail(testnm + " missing exception");
} catch (PoolingFeatureException expected) {
@@ -387,6 +392,6 @@ public class DmaapManagerTest {
@FunctionalInterface
public static interface VFunction {
- public void apply(Void arg) throws PoolingFeatureException;
+ public void apply() throws PoolingFeatureException;
}
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index 13d70f52..cc588384 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -26,13 +26,13 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
import java.io.IOException;
import java.util.Arrays;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
@@ -43,13 +43,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -71,7 +69,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
/**
* End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
- * its own feature object.
+ * its own feature object. Uses real feature objects. However, the following are not:
+ * <dl>
+ * <dt>DMaaP sources and sinks</dt>
+ * <dd>simulated using queues. There is one queue for the external topic, and one queue
+ * for each host's internal topic. Messages published to the "admin" channel are simply
+ * sent to all of the hosts' internal topic queues</dd>
+ * <dt>PolicyEngine, PolicyController, DroolsController</dt>
+ * <dd>mocked</dd>
+ * </dl>
*/
public class FeatureTest {
@@ -92,26 +98,15 @@ public class FeatureTest {
*/
private static final String CONTROLLER1 = "controller.one";
- // private static final long STD_HEARTBEAT_WAIT_MS = 100;
- // private static final long STD_REACTIVATE_WAIT_MS = 200;
- // private static final long STD_IDENTIFICATION_MS = 60;
- // private static final long STD_ACTIVE_HEARTBEAT_MS = 5;
- // private static final long STD_INTER_HEARTBEAT_MS = 50;
- // private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
- // private static final long POLL_MS = 2;
- // private static final long INTER_POLL_MS = 2;
- // private static final long EVENT_WAIT_SEC = 5;
-
- // use these to slow things down
- private static final long STD_HEARTBEAT_WAIT_MS = 5000;
- private static final long STD_REACTIVATE_WAIT_MS = 10000;
- private static final long STD_IDENTIFICATION_MS = 10000;
- private static final long STD_ACTIVE_HEARTBEAT_MS = 5000;
- private static final long STD_INTER_HEARTBEAT_MS = 12000;
- private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
- private static final long POLL_MS = 2;
- private static final long INTER_POLL_MS = 2000;
- private static final long EVENT_WAIT_SEC = 1000;
+ private static long stdReactivateWaitMs = 200;
+ private static long stdIdentificationMs = 60;
+ private static long stdStartHeartbeatMs = 60;
+ private static long stdActiveHeartbeatMs = 50;
+ private static long stdInterHeartbeatMs = 5;
+ private static long stdOfflinePubWaitMs = 2;
+ private static long stdPollMs = 2;
+ private static long stdInterPollMs = 2;
+ private static long stdEventWaitSec = 10;
// these are saved and restored on exit from this test class
private static PoolingFeature.Factory saveFeatureFactory;
@@ -128,6 +123,8 @@ public class FeatureTest {
saveFeatureFactory = PoolingFeature.getFactory();
saveManagerFactory = PoolingManagerImpl.getFactory();
saveDmaapFactory = DmaapManager.getFactory();
+
+ // note: invoke runSlow() to slow things down
}
@AfterClass
@@ -149,51 +146,35 @@ public class FeatureTest {
}
}
- @Ignore
@Test
public void test_SingleHost() throws Exception {
- int nmessages = 70;
-
- ctx = new Context(nmessages);
-
- ctx.addHost();
- ctx.startHosts();
-
- for (int x = 0; x < nmessages; ++x) {
- ctx.offerExternal(makeMessage(x));
- }
-
- ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
-
- assertEquals(0, ctx.getDecodeErrors());
- assertEquals(0, ctx.getRemainingEvents());
- ctx.checkAllSawAMsg();
+ run(70, 1);
}
- @Ignore
@Test
public void test_TwoHosts() throws Exception {
- int nmessages = 200;
+ run(200, 2);
+ }
+
+ @Test
+ public void test_ThreeHosts() throws Exception {
+ run(200, 3);
+ }
+ private void run(int nmessages, int nhosts) throws Exception {
ctx = new Context(nmessages);
- ctx.addHost();
- ctx.addHost();
+ for (int x = 0; x < nhosts; ++x) {
+ ctx.addHost();
+ }
+
ctx.startHosts();
for (int x = 0; x < nmessages; ++x) {
ctx.offerExternal(makeMessage(x));
}
- // wait for all hosts to have time to process a few messages
- Thread.sleep(STD_ACTIVE_HEARTBEAT_MS + INTER_POLL_MS * 3);
-
- // pause a topic for a bit
-// ctx.pauseTopic();
-
- // now we'll see if it recovers
-
- ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+ ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
assertEquals(0, ctx.getDecodeErrors());
assertEquals(0, ctx.getRemainingEvents());
@@ -203,6 +184,21 @@ public class FeatureTest {
private String makeMessage(int reqnum) {
return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
}
+
+ /**
+ * Invoke this to slow the timers down.
+ */
+ protected static void runSlow() {
+ stdReactivateWaitMs = 10000;
+ stdIdentificationMs = 10000;
+ stdStartHeartbeatMs = 15000;
+ stdActiveHeartbeatMs = 12000;
+ stdInterHeartbeatMs = 5000;
+ stdOfflinePubWaitMs = 2;
+ stdPollMs = 2;
+ stdInterPollMs = 2000;
+ stdEventWaitSec = 1000;
+ }
/**
* Context used for a single test case.
@@ -244,12 +240,6 @@ public class FeatureTest {
private final CountDownLatch eventCounter;
/**
- * Maps host name to its topic source. This must be in sorted order so we can
- * identify the source for the host with the higher name.
- */
- private TreeMap<String, TopicSourceImpl> host2topic = new TreeMap<>();
-
- /**
* The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
* {@link #getCurrentHost()}.
*/
@@ -280,9 +270,14 @@ public class FeatureTest {
/**
* Creates and adds a new host to the context.
+ *
+ * @return the new Host
*/
- public void addHost() {
- hosts.add(new Host(this));
+ public Host addHost() {
+ Host host = new Host(this);
+ hosts.add(host);
+
+ return host;
}
/**
@@ -443,26 +438,6 @@ public class FeatureTest {
}
/**
- * Associates a host with a topic.
- *
- * @param host
- * @param topic
- */
- public void addTopicSource(String host, TopicSourceImpl topic) {
- host2topic.put(host, topic);
- }
-
- /**
- * Pauses the last topic source long enough to miss a heart beat.
- */
- public void pauseTopic() {
- Entry<String, TopicSourceImpl> ent = host2topic.lastEntry();
- if (ent != null) {
- ent.getValue().pause(STD_ACTIVE_HEARTBEAT_MS);
- }
- }
-
- /**
* Gets the current host, provided this is used from within a call to
* {@link #withHost(Host, VoidFunction)}.
*
@@ -527,12 +502,10 @@ public class FeatureTest {
}
/**
- * Gets the host name. This should only be invoked within {@link #start()}.
- *
* @return the host name
*/
public String getName() {
- return PoolingManagerImpl.getLastHost();
+ return feature.getHost();
}
/**
@@ -759,11 +732,6 @@ public class FeatureTest {
private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
/**
- * Time, in milliseconds, to pause before polling for more messages.
- */
- private AtomicLong pauseTimeMs = new AtomicLong(0);
-
- /**
*
* @param context
* @param internal {@code true} if to read from the internal topic, {@code false}
@@ -771,12 +739,8 @@ public class FeatureTest {
*/
public TopicSourceImpl(Context context, boolean internal) {
if (internal) {
- Host host = context.getCurrentHost();
-
this.topic = INTERNAL_TOPIC;
- this.queue = host.getInternalQueue();
-
- context.addTopicSource(host.getName(), this);
+ this.queue = context.getCurrentHost().getInternalQueue();
} else {
this.topic = EXTERNAL_TOPIC;
@@ -809,11 +773,12 @@ public class FeatureTest {
reregister(newPair);
- new Thread(() -> {
+ Thread thread = new Thread(() -> {
+
try {
do {
processMessages(newPair.first(), listener);
- } while (!newPair.first().await(INTER_POLL_MS, TimeUnit.MILLISECONDS));
+ } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
logger.info("topic source thread completed");
@@ -827,7 +792,10 @@ public class FeatureTest {
newPair.second().countDown();
- }).start();
+ });
+
+ thread.setDaemon(true);
+ thread.start();
}
/**
@@ -879,19 +847,7 @@ public class FeatureTest {
}
/**
- * Indicates that {@link #processMessages(CountDownLatch, TopicListener)} should
- * pause a bit.
- *
- * @param timeMs time, in milliseconds, to pause
- */
- public void pause(long timeMs) {
- pauseTimeMs.set(timeMs);
- }
-
- /**
- * Polls for messages from the topic and offers them to the listener. If
- * {@link #pauseTimeMs} is non-zero, then it pauses for the specified time and
- * then immediately returns.
+ * Polls for messages from the topic and offers them to the listener.
*
* @param stopped triggered if processing should stop
* @param listener
@@ -901,14 +857,7 @@ public class FeatureTest {
for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
- long ptm = pauseTimeMs.getAndSet(0);
- if (ptm != 0) {
- logger.warn("pause processing");
- stopped.await(ptm, TimeUnit.MILLISECONDS);
- return;
- }
-
- String msg = queue.poll(POLL_MS, TimeUnit.MILLISECONDS);
+ String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
if (msg == null) {
return;
}
@@ -1038,18 +987,20 @@ public class FeatureTest {
props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
- props.setProperty("pooling." + CONTROLLER1 + ".topic", INTERNAL_TOPIC);
- props.setProperty("pooling." + CONTROLLER1 + ".enabled", "true");
- props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.limit", "10000");
- props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.age.milliseconds", "1000000");
- props.setProperty("pooling." + CONTROLLER1 + ".start.heartbeat.milliseconds", "" + STD_HEARTBEAT_WAIT_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".reactivate.milliseconds", "" + STD_REACTIVATE_WAIT_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".identification.milliseconds", "" + STD_IDENTIFICATION_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".active.heartbeat.milliseconds",
- "" + STD_ACTIVE_HEARTBEAT_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".inter.heartbeat.milliseconds", "" + STD_INTER_HEARTBEAT_MS);
- props.setProperty("pooling." + CONTROLLER1 + ".offline.publish.wait.milliseconds",
- "" + STD_OFFLINE_PUB_WAIT_MS);
+ props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
+ props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
+ props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
+ "" + stdOfflinePubWaitMs);
+ props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
+ "" + stdStartHeartbeatMs);
+ props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
+ props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
+ props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
+ "" + stdActiveHeartbeatMs);
+ props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
+ "" + stdInterHeartbeatMs);
return props;
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
new file mode 100644
index 00000000..6884bec8
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
@@ -0,0 +1,734 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.event.comm.TopicEndpoint;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
+import org.onap.policy.drools.properties.PolicyProperties;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
+ * its own feature object. Uses real feature objects, as well as real DMaaP sources and
+ * sinks. However, the following are not:
+ * <dl>
+ * <dt>PolicyEngine, PolicyController, DroolsController</dt>
+ * <dd>mocked</dd>
+ * </dl>
+ *
+ * <p>
+ * The following fields must be set before executing this:
+ * <ul>
+ * <li>UEB_SERVERS</li>
+ * <li>INTERNAL_TOPIC</li>
+ * <li>EXTERNAL_TOPIC</li>
+ * </ul>
+ */
+public class FeatureTest2 {
+
+ private static final Logger logger = LoggerFactory.getLogger(FeatureTest2.class);
+
+ /**
+ * UEB servers for both internal & external topics.
+ */
+ private static final String UEB_SERVERS = "";
+
+ /**
+ * Name of the topic used for inter-host communication.
+ */
+ private static final String INTERNAL_TOPIC = "";
+
+ /**
+ * Name of the topic from which "external" events "arrive".
+ */
+ private static final String EXTERNAL_TOPIC = "";
+
+ /**
+ * Consumer group to use when polling the external topic.
+ */
+ private static final String EXTERNAL_GROUP = FeatureTest2.class.getName();
+
+ /**
+ * Name of the controller.
+ */
+ private static final String CONTROLLER1 = "controller.one";
+
+ /**
+ * Maximum number of items to fetch from DMaaP in a single poll.
+ */
+ private static final String FETCH_LIMIT = "5";
+
+ private static final long STD_REACTIVATE_WAIT_MS = 10000;
+ private static final long STD_IDENTIFICATION_MS = 10000;
+ private static final long STD_START_HEARTBEAT_MS = 15000;
+ private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
+ private static final long STD_INTER_HEARTBEAT_MS = 5000;
+ private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
+ private static final long EVENT_WAIT_SEC = 15;
+
+ // these are saved and restored on exit from this test class
+ private static PoolingFeature.Factory saveFeatureFactory;
+ private static PoolingManagerImpl.Factory saveManagerFactory;
+
+ /**
+ * Sink for external DMaaP topic.
+ */
+ private static TopicSink externalSink;
+
+ /**
+ * Context for the current test case.
+ */
+ private Context ctx;
+
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveFeatureFactory = PoolingFeature.getFactory();
+ saveManagerFactory = PoolingManagerImpl.getFactory();
+
+ Properties props = makeSinkProperties(EXTERNAL_TOPIC);
+ externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0);
+ externalSink.start();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ PoolingFeature.setFactory(saveFeatureFactory);
+ PoolingManagerImpl.setFactory(saveManagerFactory);
+
+ externalSink.stop();
+ }
+
+ @Before
+ public void setUp() {
+ ctx = null;
+ }
+
+ @After
+ public void tearDown() {
+ if (ctx != null) {
+ ctx.destroy();
+ }
+ }
+
+ @Ignore
+ @Test
+ public void test_SingleHost() throws Exception {
+ run(70, 1);
+ }
+
+ @Ignore
+ @Test
+ public void test_TwoHosts() throws Exception {
+ run(200, 2);
+ }
+
+ @Ignore
+ @Test
+ public void test_ThreeHosts() throws Exception {
+ run(200, 3);
+ }
+
+ private void run(int nmessages, int nhosts) throws Exception {
+ ctx = new Context(nmessages);
+
+ for (int x = 0; x < nhosts; ++x) {
+ ctx.addHost();
+ }
+
+ ctx.startHosts();
+ ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
+
+ for (int x = 0; x < nmessages; ++x) {
+ ctx.offerExternal(makeMessage(x));
+ }
+
+ ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(0, ctx.getDecodeErrors());
+ assertEquals(0, ctx.getRemainingEvents());
+ ctx.checkAllSawAMsg();
+ }
+
+ private String makeMessage(int reqnum) {
+ return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
+ }
+
+ private static Properties makeSinkProperties(String topic) {
+ Properties props = new Properties();
+
+ props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS, topic);
+
+ props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+ props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
+ props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
+
+ return props;
+ }
+
+ private static Properties makeSourceProperties(String topic) {
+ Properties props = new Properties();
+
+ props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
+
+ props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+ props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
+ props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
+
+ if (EXTERNAL_TOPIC.equals(topic)) {
+ // consumer group is a constant
+ props.setProperty(
+ PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX,
+ EXTERNAL_GROUP);
+
+ // consumer instance is generated by the BusConsumer code
+ }
+
+ // else internal topic: feature populates info for internal topic
+
+ return props;
+ }
+
+ /**
+ * Context used for a single test case.
+ */
+ private static class Context {
+
+ private final FeatureFactory featureFactory;
+ private final ManagerFactory managerFactory;
+
+ /**
+ * Hosts that have been added to this context.
+ */
+ private final Deque<Host> hosts = new LinkedList<>();
+
+ /**
+ * Maps a drools controller to its policy controller.
+ */
+ private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
+
+ /**
+ * Counts the number of decode errors.
+ */
+ private final AtomicInteger nDecodeErrors = new AtomicInteger(0);
+
+ /**
+ * Number of events we're still waiting to receive.
+ */
+ private final CountDownLatch eventCounter;
+
+ /**
+ *
+ * @param nEvents number of events to be processed
+ */
+ public Context(int nEvents) {
+ featureFactory = new FeatureFactory(this);
+ managerFactory = new ManagerFactory(this);
+ eventCounter = new CountDownLatch(nEvents);
+
+ PoolingFeature.setFactory(featureFactory);
+ PoolingManagerImpl.setFactory(managerFactory);
+ }
+
+ /**
+ * Destroys the context, stopping any hosts that remain.
+ */
+ public void destroy() {
+ stopHosts();
+ hosts.clear();
+ }
+
+ /**
+ * Creates and adds a new host to the context.
+ *
+ * @return the new Host
+ */
+ public Host addHost() {
+ Host host = new Host(this);
+ hosts.add(host);
+
+ return host;
+ }
+
+ /**
+ * Starts the hosts.
+ */
+ public void startHosts() {
+ hosts.forEach(host -> host.start());
+ }
+
+ /**
+ * Stops the hosts.
+ */
+ public void stopHosts() {
+ hosts.forEach(host -> host.stop());
+ }
+
+ /**
+ * Verifies that all hosts processed at least one message.
+ */
+ public void checkAllSawAMsg() {
+ int x = 0;
+ for (Host host : hosts) {
+ assertTrue("x=" + x, host.messageSeen());
+ ++x;
+ }
+ }
+
+ /**
+ * Offers an event to the external topic.
+ *
+ * @param event
+ */
+ public void offerExternal(String event) {
+ externalSink.send(event);
+ }
+
+ /**
+ * Decodes an event.
+ *
+ * @param event
+ * @return the decoded event, or {@code null} if it cannot be decoded
+ */
+ public Object decodeEvent(String event) {
+ return managerFactory.decodeEvent(null, null, event);
+ }
+
+ /**
+ * Associates a controller with its drools controller.
+ *
+ * @param controller
+ * @param droolsController
+ */
+ public void addController(PolicyController controller, DroolsController droolsController) {
+ drools2policy.put(droolsController, controller);
+ }
+
+ /**
+ * @param droolsController
+ * @return the controller associated with a drools controller, or {@code null} if
+ * it has no associated controller
+ */
+ public PolicyController getController(DroolsController droolsController) {
+ return drools2policy.get(droolsController);
+ }
+
+ /**
+ *
+ * @return the number of decode errors so far
+ */
+ public int getDecodeErrors() {
+ return nDecodeErrors.get();
+ }
+
+ /**
+ * Increments the count of decode errors.
+ */
+ public void bumpDecodeErrors() {
+ nDecodeErrors.incrementAndGet();
+ }
+
+ /**
+ *
+ * @return the number of events that haven't been processed
+ */
+ public long getRemainingEvents() {
+ return eventCounter.getCount();
+ }
+
+ /**
+ * Adds an event to the counter.
+ */
+ public void addEvent() {
+ eventCounter.countDown();
+ }
+
+ /**
+ * Waits, for a period of time, for all events to be processed.
+ *
+ * @param time
+ * @param units
+ * @return {@code true} if all events have been processed, {@code false} otherwise
+ * @throws InterruptedException
+ */
+ public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
+ return eventCounter.await(time, units);
+ }
+
+ /**
+ * Waits, for a period of time, for all hosts to enter the Active state.
+ *
+ * @param timeMs maximum time to wait, in milliseconds
+ * @throws InterruptedException
+ */
+ public void awaitAllActive(long timeMs) throws InterruptedException {
+ long tend = timeMs + System.currentTimeMillis();
+
+ for (Host host : hosts) {
+ long tremain = Math.max(0, tend - System.currentTimeMillis());
+ assertTrue(host.awaitActive(tremain));
+ }
+ }
+ }
+
+ /**
+ * Simulates a single "host".
+ */
+ private static class Host {
+
+ private final PoolingFeature feature = new PoolingFeature();
+
+ /**
+ * {@code True} if this host has processed a message, {@code false} otherwise.
+ */
+ private final AtomicBoolean sawMsg = new AtomicBoolean(false);
+
+ private final TopicSource externalSource;
+
+ // mock objects
+ private final PolicyEngine engine = mock(PolicyEngine.class);
+ private final ListenerController controller = mock(ListenerController.class);
+ private final DroolsController drools = mock(DroolsController.class);
+
+ /**
+ *
+ * @param context
+ */
+ public Host(Context context) {
+
+ when(controller.getName()).thenReturn(CONTROLLER1);
+ when(controller.getDrools()).thenReturn(drools);
+
+ Properties props = makeSourceProperties(EXTERNAL_TOPIC);
+ externalSource = TopicEndpoint.manager.addTopicSources(props).get(0);
+
+ // stop consuming events if the controller stops
+ when(controller.stop()).thenAnswer(args -> {
+ externalSource.unregister(controller);
+ return true;
+ });
+
+ doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
+
+ context.addController(controller, drools);
+ }
+
+ /**
+ * Waits, for a period of time, for the host to enter the Active state.
+ *
+ * @param timeMs time to wait, in milliseconds
+ * @return {@code true} if the host entered the Active state within the given
+ * amount of time, {@code false} otherwise
+ * @throws InterruptedException
+ */
+ public boolean awaitActive(long timeMs) throws InterruptedException {
+ return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Starts threads for the host so that it begins consuming from both the external
+ * "DMaaP" topic and its own internal "DMaaP" topic.
+ */
+ public void start() {
+ feature.beforeStart(engine);
+ feature.afterCreate(controller);
+
+ feature.beforeStart(controller);
+
+ // start consuming events from the external topic
+ externalSource.register(controller);
+
+ feature.afterStart(controller);
+ }
+
+ /**
+ * Stops the host's threads.
+ */
+ public void stop() {
+ feature.beforeStop(controller);
+ externalSource.unregister(controller);
+ feature.afterStop(controller);
+ }
+
+ /**
+ * Offers an event to the feature, before the policy controller handles it.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ return feature.beforeOffer(controller, protocol, topic2, event);
+ }
+
+ /**
+ * Offers an event to the feature, after the policy controller handles it.
+ *
+ * @param protocol
+ * @param topic
+ * @param event
+ * @param success
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
+
+ return feature.afterOffer(controller, protocol, topic, event, success);
+ }
+
+ /**
+ * Offers an event to the feature, before the drools controller handles it.
+ *
+ * @param fact
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean beforeInsert(Object fact) {
+ return feature.beforeInsert(drools, fact);
+ }
+
+ /**
+ * Offers an event to the feature, after the drools controller handles it.
+ *
+ * @param fact
+ * @param successInsert {@code true} if it was successfully inserted by the drools
+ * controller, {@code false} otherwise
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean afterInsert(Object fact, boolean successInsert) {
+ return feature.afterInsert(drools, fact, successInsert);
+ }
+
+ /**
+ * Indicates that a message was seen for this host.
+ */
+ public void sawMessage() {
+ sawMsg.set(true);
+ }
+
+ /**
+ *
+ * @return {@code true} if a message was seen for this host, {@code false}
+ * otherwise
+ */
+ public boolean messageSeen() {
+ return sawMsg.get();
+ }
+ }
+
+ /**
+ * Listener for the external topic. Simulates the actions taken by
+ * <i>AggregatedPolicyController.onTopicEvent</i>.
+ */
+ private static class MyExternalTopicListener implements Answer<Void> {
+
+ private final Context context;
+ private final Host host;
+
+ public MyExternalTopicListener(Context context, Host host) {
+ this.context = context;
+ this.host = host;
+ }
+
+ @Override
+ public Void answer(InvocationOnMock args) throws Throwable {
+ int i = 0;
+ CommInfrastructure commType = args.getArgument(i++);
+ String topic = args.getArgument(i++);
+ String event = args.getArgument(i++);
+
+ if (host.beforeOffer(commType, topic, event)) {
+ return null;
+ }
+
+ boolean result;
+ Object fact = context.decodeEvent(event);
+
+ if (fact == null) {
+ result = false;
+ context.bumpDecodeErrors();
+
+ } else {
+ result = true;
+
+ if (!host.beforeInsert(fact)) {
+ // feature did not handle it so we handle it here
+ host.afterInsert(fact, result);
+
+ host.sawMessage();
+ context.addEvent();
+ }
+ }
+
+ host.afterOffer(commType, topic, event, result);
+ return null;
+ }
+ }
+
+ /**
+ * Simulator for the feature-level factory.
+ */
+ private static class FeatureFactory extends PoolingFeature.Factory {
+
+ private final Context context;
+
+ /**
+ *
+ * @param context
+ */
+ public FeatureFactory(Context context) {
+ this.context = context;
+
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it
+ * hasn't been fully initialized yet
+ */
+ }
+
+ @Override
+ public Properties getProperties(String featName) {
+ Properties props = new Properties();
+
+ props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
+
+ props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
+ props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
+ props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
+ "" + STD_OFFLINE_PUB_WAIT_MS);
+ props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
+ "" + STD_START_HEARTBEAT_MS);
+ props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
+ props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
+ props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
+ "" + STD_ACTIVE_HEARTBEAT_MS);
+ props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
+ "" + STD_INTER_HEARTBEAT_MS);
+
+ props.putAll(makeSinkProperties(INTERNAL_TOPIC));
+ props.putAll(makeSourceProperties(INTERNAL_TOPIC));
+
+ return props;
+ }
+
+ @Override
+ public PolicyController getController(DroolsController droolsController) {
+ return context.getController(droolsController);
+ }
+ }
+
+ /**
+ * Simulator for the pooling manager factory.
+ */
+ private static class ManagerFactory extends PoolingManagerImpl.Factory {
+
+ /**
+ * Used to decode events from the external topic.
+ */
+ private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
+ @Override
+ protected ObjectMapper initialValue() {
+ return new ObjectMapper();
+ }
+ };
+
+ /**
+ * Used to decode events into a Map.
+ */
+ private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {};
+
+ /**
+ *
+ * @param context
+ */
+ public ManagerFactory(Context context) {
+
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it
+ * hasn't been fully initialized yet
+ */
+ }
+
+ @Override
+ public boolean canDecodeEvent(DroolsController drools, String topic) {
+ return true;
+ }
+
+ @Override
+ public Object decodeEvent(DroolsController drools, String topic, String event) {
+ try {
+ return mapper.get().readValue(event, typeRef);
+
+ } catch (IOException e) {
+ logger.warn("cannot decode external event", e);
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Controller that also implements the {@link TopicListener} interface.
+ */
+ private static interface ListenerController extends PolicyController, TopicListener {
+
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index 7782e475..f8f37559 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -22,6 +22,7 @@ package org.onap.policy.drools.pooling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
@@ -122,8 +123,8 @@ public class PoolingFeatureTest {
when(factory.getController(drools2)).thenReturn(controller2);
when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled);
- when(factory.makeManager(any(), any())).thenAnswer(args -> {
- PoolingProperties props = args.getArgument(1);
+ when(factory.makeManager(any(), any(), any(), any())).thenAnswer(args -> {
+ PoolingProperties props = args.getArgument(2);
PoolingManagerImpl mgr = mock(PoolingManagerImpl.class);
@@ -149,6 +150,19 @@ public class PoolingFeatureTest {
}
@Test
+ public void testGetHost() {
+ String host = pool.getHost();
+ assertNotNull(host);
+
+ // create another and ensure it generates another host name
+ pool = new PoolingFeature();
+ String host2 = pool.getHost();
+ assertNotNull(host2);
+
+ assertTrue(!host.equals(host2));
+ }
+
+ @Test
public void testGetSequenceNumber() {
assertEquals(0, pool.getSequenceNumber());
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index e32fa545..e0024b79 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -73,6 +74,7 @@ public class PoolingManagerImplTest {
protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
+ private static final String MY_HOST = "my.host";
private static final String HOST2 = "other.host";
private static final String MY_CONTROLLER = "my.controller";
@@ -111,6 +113,7 @@ public class PoolingManagerImplTest {
private DroolsController drools;
private Serializer ser;
private Factory factory;
+ private CountDownLatch active;
private PoolingManagerImpl mgr;
@@ -140,6 +143,7 @@ public class PoolingManagerImplTest {
futures = new LinkedList<>();
ser = new Serializer();
+ active = new CountDownLatch(1);
factory = mock(Factory.class);
eventQueue = mock(EventQueue.class);
@@ -151,7 +155,7 @@ public class PoolingManagerImplTest {
when(factory.makeEventQueue(any())).thenReturn(eventQueue);
when(factory.makeClassExtractors(any())).thenReturn(extractors);
- when(factory.makeDmaapManager(any())).thenReturn(dmaap);
+ when(factory.makeDmaapManager(any(), any())).thenReturn(dmaap);
when(factory.makeScheduler()).thenReturn(sched);
when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true);
when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT);
@@ -179,12 +183,12 @@ public class PoolingManagerImplTest {
PoolingManagerImpl.setFactory(factory);
- mgr = new PoolingManagerImpl(controller, poolProps);
+ mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps, active);
}
@Test
- public void testPoolingManagerImpl() {
- mgr = new PoolingManagerImpl(controller, poolProps);
+ public void testPoolingManagerImpl() throws Exception {
+ verify(factory).makeDmaapManager(any(), any());
State st = mgr.getCurrent();
assertTrue(st instanceof IdleState);
@@ -202,7 +206,7 @@ public class PoolingManagerImplTest {
PolicyController ctlr = mock(PolicyController.class);
PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class,
- xxx -> new PoolingManagerImpl(ctlr, poolProps));
+ () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps, active));
assertNotNull(ex.getCause());
assertTrue(ex.getCause() instanceof ClassCastException);
}
@@ -211,23 +215,28 @@ public class PoolingManagerImplTest {
public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
// throw an exception when we try to create the dmaap manager
PoolingFeatureException ex = new PoolingFeatureException();
- when(factory.makeDmaapManager(any())).thenThrow(ex);
+ when(factory.makeDmaapManager(any(), any())).thenThrow(ex);
PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
- xxx -> new PoolingManagerImpl(controller, poolProps));
+ () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active));
assertEquals(ex, ex2.getCause());
}
@Test
- public void testGetHost() {
- String host = mgr.getHost();
- assertNotNull(host);
+ public void testGetCurrent() throws Exception {
+ assertEquals(IdleState.class, mgr.getCurrent().getClass());
+
+ startMgr();
+
+ assertEquals(StartState.class, mgr.getCurrent().getClass());
+ }
- // create another manager and ensure it generates a different host
- mgr = new PoolingManagerImpl(controller, poolProps);
+ @Test
+ public void testGetHost() {
+ assertEquals(MY_HOST, mgr.getHost());
- assertNotNull(mgr.getHost());
- assertFalse(host.equals(mgr.getHost()));
+ mgr = new PoolingManagerImpl(HOST2, controller, poolProps, active);
+ assertEquals(HOST2, mgr.getHost());
}
@Test
@@ -268,7 +277,7 @@ public class PoolingManagerImplTest {
PoolingFeatureException ex = new PoolingFeatureException();
doThrow(ex).when(dmaap).startPublisher();
- PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, xxx -> mgr.beforeStart());
+ PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, () -> mgr.beforeStart());
assertEquals(ex, ex2);
// should never start the scheduler
@@ -453,8 +462,9 @@ public class PoolingManagerImplTest {
// should have set the new filter
verify(dmaap, times(++ntimes)).setFilter(any());
- // should have cancelled the timer
- assertEquals(1, futures.size());
+ // should have cancelled the timers
+ assertEquals(2, futures.size());
+ verify(futures.poll()).cancel(false);
verify(futures.poll()).cancel(false);
/*
@@ -465,8 +475,9 @@ public class PoolingManagerImplTest {
// should have set the new filter
verify(dmaap, times(++ntimes)).setFilter(any());
- // timer should still be active
- assertEquals(1, futures.size());
+ // new timers should now be active
+ assertEquals(2, futures.size());
+ verify(futures.poll(), never()).cancel(false);
verify(futures.poll(), never()).cancel(false);
}
@@ -548,7 +559,7 @@ public class PoolingManagerImplTest {
ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
- verify(sched).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
+ verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
unitCap.capture());
assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
@@ -987,7 +998,7 @@ public class PoolingManagerImplTest {
// route the message to this host
mgr.startDistributing(makeAssignments(true));
-
+
// generate RuntimeException when onTopicEvent() is invoked
doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
@@ -1056,21 +1067,27 @@ public class PoolingManagerImplTest {
startMgr();
// route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
+ assertNotNull(mgr.startDistributing(makeAssignments(true)));
assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ verify(eventQueue, never()).add(any());
- // null assignments should be ignored
- mgr.startDistributing(null);
+ // null assignments should cause message to be queued
+ assertNull(mgr.startDistributing(null));
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ verify(eventQueue).add(any());
+
+ // route the message to this host
+ assertNotNull(mgr.startDistributing(makeAssignments(true)));
assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ verify(eventQueue).add(any());
// route the message to the other host
- mgr.startDistributing(makeAssignments(false));
-
+ assertNotNull(mgr.startDistributing(makeAssignments(false)));
assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ verify(eventQueue).add(any());
}
@Test
@@ -1086,7 +1103,8 @@ public class PoolingManagerImplTest {
when(eventQueue.poll()).thenAnswer(args -> lst.poll());
// route the messages to this host
- assertTrue(mgr.startDistributing(makeAssignments(true)).await(2, TimeUnit.SECONDS));
+ CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
// all of the events should have been processed locally
verify(dmaap, times(START_PUB)).publish(any());
@@ -1106,7 +1124,8 @@ public class PoolingManagerImplTest {
when(eventQueue.poll()).thenAnswer(args -> lst.poll());
// route the messages to the OTHER host
- assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS));
+ CountDownLatch latch = mgr.startDistributing(makeAssignments(false));
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
// all of the events should have been forwarded
verify(dmaap, times(4)).publish(any());
@@ -1142,6 +1161,7 @@ public class PoolingManagerImplTest {
assertTrue(st instanceof ActiveState);
assertEquals(mgr.getHost(), st.getHost());
assertEquals(asgn, mgr.getAssignments());
+ assertEquals(0, active.getCount());
}
@Test
@@ -1149,6 +1169,7 @@ public class PoolingManagerImplTest {
State st = mgr.goInactive();
assertTrue(st instanceof InactiveState);
assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(1, active.getCount());
}
@Test
@@ -1330,7 +1351,7 @@ public class PoolingManagerImplTest {
*/
private <T extends Exception> T expectException(Class<T> exClass, ExFunction<T> func) {
try {
- func.apply(null);
+ func.apply();
throw new AssertionError("missing exception");
} catch (Exception e) {
@@ -1349,10 +1370,9 @@ public class PoolingManagerImplTest {
/**
* Invokes the function.
*
- * @param arg always {@code null}
* @throws T if an error occurs
*/
- public void apply(Void arg) throws T;
+ public void apply() throws T;
}
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
index 2d734c1c..459c770a 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
@@ -48,13 +48,13 @@ public class PoolingPropertiesTest {
public static final boolean STD_FEATURE_ENABLED = true;
public static final int STD_OFFLINE_LIMIT = 10;
public static final long STD_OFFLINE_AGE_MS = 1000L;
- public static final long STD_START_HEARTBEAT_MS = 2000L;
- public static final long STD_REACTIVATE_MS = 3000L;
- public static final long STD_IDENTIFICATION_MS = 4000L;
- public static final long STD_LEADER_MS = 5000L;
- public static final long STD_ACTIVE_HEARTBEAT_MS = 6000L;
- public static final long STD_INTER_HEARTBEAT_MS = 7000L;
- public static final long STD_OFFLINE_PUB_WAIT_MS = 8000L;
+ public static final long STD_OFFLINE_PUB_WAIT_MS = 2000L;
+ public static final long STD_START_HEARTBEAT_MS = 3000L;
+ public static final long STD_REACTIVATE_MS = 4000L;
+ public static final long STD_IDENTIFICATION_MS = 5000L;
+ public static final long STD_LEADER_MS = 6000L;
+ public static final long STD_ACTIVE_HEARTBEAT_MS = 7000L;
+ public static final long STD_INTER_HEARTBEAT_MS = 8000L;
private Properties plain;
private PoolingProperties pooling;
@@ -99,8 +99,13 @@ public class PoolingPropertiesTest {
}
@Test
+ public void testGetOfflinePubWaitMs() throws PropertyException {
+ doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs());
+ }
+
+ @Test
public void testGetStartHeartbeatMs() throws PropertyException {
- doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 50000L, xxx -> pooling.getStartHeartbeatMs());
+ doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 100000L, xxx -> pooling.getStartHeartbeatMs());
}
@Test
@@ -123,11 +128,6 @@ public class PoolingPropertiesTest {
doTest(INTER_HEARTBEAT_MS, STD_INTER_HEARTBEAT_MS, 15000L, xxx -> pooling.getInterHeartbeatMs());
}
- @Test
- public void testGetOfflinePubWaitMs() throws PropertyException {
- doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs());
- }
-
/**
* Tests a particular property. Verifies that the correct value is returned if the
* specialized property has a value or the property has no value. Also verifies that
@@ -174,12 +174,12 @@ public class PoolingPropertiesTest {
props.setProperty(specialize(FEATURE_ENABLED, CONTROLLER), "" + STD_FEATURE_ENABLED);
props.setProperty(specialize(OFFLINE_LIMIT, CONTROLLER), "" + STD_OFFLINE_LIMIT);
props.setProperty(specialize(OFFLINE_AGE_MS, CONTROLLER), "" + STD_OFFLINE_AGE_MS);
+ props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS);
props.setProperty(specialize(START_HEARTBEAT_MS, CONTROLLER), "" + STD_START_HEARTBEAT_MS);
props.setProperty(specialize(REACTIVATE_MS, CONTROLLER), "" + STD_REACTIVATE_MS);
props.setProperty(specialize(IDENTIFICATION_MS, CONTROLLER), "" + STD_IDENTIFICATION_MS);
props.setProperty(specialize(ACTIVE_HEARTBEAT_MS, CONTROLLER), "" + STD_ACTIVE_HEARTBEAT_MS);
props.setProperty(specialize(INTER_HEARTBEAT_MS, CONTROLLER), "" + STD_INTER_HEARTBEAT_MS);
- props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS);
return props;
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
index 505dc400..d09650db 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
@@ -123,6 +123,24 @@ public class SpecPropertiesTest {
}
@Test
+ public void testSpecPropertiesStringStringProperties_EmptyPrefix() {
+ supportingProps = new Properties();
+
+ supportingProps.setProperty(PROP_NO_PREFIX, VAL_NO_PREFIX);
+ supportingProps.setProperty("a.value", VAL_GEN);
+ supportingProps.setProperty("b.value", VAL_GEN);
+ supportingProps.setProperty(MY_SPEC + ".b.value", VAL_SPEC);
+
+ // no supporting properties
+ props = new SpecProperties("", MY_SPEC, supportingProps);
+
+ assertEquals(VAL_NO_PREFIX, props.getProperty(gen(PROP_NO_PREFIX)));
+ assertEquals(VAL_GEN, props.getProperty(gen("a.value")));
+ assertEquals(VAL_SPEC, props.getProperty(MY_SPEC + ".b.value"));
+ assertNull(props.getProperty(gen(PROP_UNKNOWN)));
+ }
+
+ @Test
public void testWithTrailingDot() {
// neither has trailing dot
assertEquals(PREFIX_GEN, props.getPrefix());
@@ -132,6 +150,16 @@ public class SpecPropertiesTest {
props = new SpecProperties(PREFIX_GEN, MY_SPEC + ".");
assertEquals(PREFIX_GEN, props.getPrefix());
assertEquals(PREFIX_SPEC, props.getSpecPrefix());
+
+ // first is empty
+ props = new SpecProperties("", MY_SPEC);
+ assertEquals("", props.getPrefix());
+ assertEquals(MY_SPEC + ".", props.getSpecPrefix());
+
+ // second is empty
+ props = new SpecProperties(PREFIX_GEN, "");
+ assertEquals(PREFIX_GEN, props.getPrefix());
+ assertEquals(PREFIX_GEN, props.getSpecPrefix());
}
@Test
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
index 7b4b0602..27284dcd 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -298,18 +298,18 @@ public class ActiveStateTest extends BasicStateTester {
// heart beat generator
timer = repeatedTasks.remove();
- assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
- assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
// my heart beat checker
timer = repeatedTasks.remove();
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
// predecessor's heart beat checker
timer = repeatedTasks.remove();
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
}
@Test
@@ -327,13 +327,13 @@ public class ActiveStateTest extends BasicStateTester {
// heart beat generator
timer = repeatedTasks.remove();
- assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
- assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
// my heart beat checker
timer = repeatedTasks.remove();
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
}
@Test
@@ -389,13 +389,13 @@ public class ActiveStateTest extends BasicStateTester {
// set up next state
State next = mock(State.class);
- when(mgr.goInactive()).thenReturn(next);
+ when(mgr.goStart()).thenReturn(next);
// fire the task - should transition
assertEquals(next, task.third().fire());
- // should indicate failure
- verify(mgr).internalTopicFailed();
+ // should stop distributing
+ verify(mgr).startDistributing(null);
// should publish an offline message
Offline msg = captureAdminMessage(Offline.class);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
index 394adaee..ae53ce05 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
@@ -22,12 +22,19 @@ package org.onap.policy.drools.pooling.state;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Query;
import org.onap.policy.drools.utils.Pair;
public class InactiveStateTest extends BasicStateTester {
@@ -53,6 +60,42 @@ public class InactiveStateTest extends BasicStateTester {
}
@Test
+ public void testProcessLeader() {
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ String[] arr = {PREV_HOST, MY_HOST, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ public void testProcessLeader_Invalid() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ public void testProcessQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(new Query()));
+
+ Identification ident = captureAdminMessage(Identification.class);
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
public void testGoInatcive() {
assertNull(state.goInactive());
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
index e1718418..7ac58439 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
@@ -65,39 +65,6 @@ public class ProcessingStateTest extends BasicStateTester {
}
@Test
- public void testGoActive_WithAssignment() {
- State act = mock(State.class);
- State inact = mock(State.class);
-
- when(mgr.goActive()).thenReturn(act);
- when(mgr.goInactive()).thenReturn(inact);
-
- String[] arr = {HOST2, PREV_HOST, MY_HOST};
- BucketAssignments asgn = new BucketAssignments(arr);
-
- assertEquals(act, state.goActive(asgn));
-
- verify(mgr).startDistributing(asgn);
- }
-
- @Test
- public void testGoActive_WithoutAssignment() {
- State act = mock(State.class);
- State inact = mock(State.class);
-
- when(mgr.goActive()).thenReturn(act);
- when(mgr.goInactive()).thenReturn(inact);
-
- String[] arr = {HOST2, PREV_HOST};
- BucketAssignments asgn = new BucketAssignments(arr);
-
- assertEquals(inact, state.goActive(asgn));
-
- verify(mgr).startDistributing(asgn);
-
- }
-
- @Test
public void testProcessQuery() {
State next = mock(State.class);
when(mgr.goQuery()).thenReturn(next);
@@ -155,13 +122,6 @@ public class ProcessingStateTest extends BasicStateTester {
}
@Test
- public void testMakeIdentification() {
- Identification ident = state.makeIdentification();
- assertEquals(MY_HOST, ident.getSource());
- assertEquals(ASGN3, ident.getAssignments());
- }
-
- @Test
public void testGetAssignments() {
// assignments from constructor
assertEquals(ASGN3, state.getAssignments());
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
index a7c3a3d5..80778ed4 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling.state;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -254,10 +253,13 @@ public class QueryStateTest extends BasicStateTester {
// should published an Offline message and go inactive
State next = mock(State.class);
- when(mgr.goInactive()).thenReturn(next);
+ when(mgr.goStart()).thenReturn(next);
assertEquals(next, timer.second().fire());
+ // should stop distributing
+ verify(mgr).startDistributing(null);
+
Offline msg = captureAdminMessage(Offline.class);
assertEquals(MY_HOST, msg.getSource());
}
@@ -343,21 +345,6 @@ public class QueryStateTest extends BasicStateTester {
}
@Test
- public void testHasAssignment() {
- // null assignment
- mgr.startDistributing(null);
- assertFalse(state.hasAssignment());
-
- // not in assignments
- state.setAssignments(new BucketAssignments(new String[] {HOST3}));
- assertFalse(state.hasAssignment());
-
- // it IS in the assignments
- state.setAssignments(new BucketAssignments(new String[] {MY_HOST}));
- assertTrue(state.hasAssignment());
- }
-
- @Test
public void testRecordInfo_NullSource() {
state.setAssignments(ASGN3);
state.setLeader(MY_HOST);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
index af4e8f13..ee4c1add 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
@@ -39,6 +40,7 @@ import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
import org.onap.policy.drools.utils.Pair;
+import org.onap.policy.drools.utils.Triple;
public class StartStateTest extends BasicStateTester {
@@ -78,15 +80,36 @@ public class StartStateTest extends BasicStateTester {
assertEquals(MY_HOST, msg.first());
assertEquals(state.getHbTimestampMs(), msg.second().getTimestampMs());
- Pair<Long, StateTimerTask> timer = onceTasks.removeFirst();
- assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first().longValue());
+ /*
+ * Verify heartbeat generator
+ */
+ Triple<Long, Long, StateTimerTask> generator = repeatedTasks.removeFirst();
+
+ assertEquals(STD_INTER_HEARTBEAT_MS, generator.first().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, generator.second().longValue());
+
+ // invoke the task - it should generate another heartbeat
+ assertEquals(null, generator.third().fire());
+ verify(mgr, times(2)).publish(MY_HOST, msg.second());
+
+ // and again
+ assertEquals(null, generator.third().fire());
+ verify(mgr, times(3)).publish(MY_HOST, msg.second());
+
+
+ /*
+ * Verify heartbeat checker
+ */
+ Pair<Long, StateTimerTask> checker = onceTasks.removeFirst();
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, checker.first().longValue());
// invoke the task - it should go to the state returned by the mgr
State next = mock(State.class);
when(mgr.goInactive()).thenReturn(next);
- assertEquals(next, timer.second().fire());
+ assertEquals(next, checker.second().fire());
verify(mgr).internalTopicFailed();
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
index a184dfad..47624aa0 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -154,12 +154,48 @@ public class StateTest extends BasicStateTester {
}
@Test
- public void testGoActive() {
- State next = mock(State.class);
- when(mgr.goActive()).thenReturn(next);
+ public void testGoActive_WithAssignment() {
+ State act = mock(State.class);
+ State inact = mock(State.class);
- State next2 = state.goActive();
- assertEquals(next, next2);
+ when(mgr.goActive()).thenReturn(act);
+ when(mgr.goInactive()).thenReturn(inact);
+
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertEquals(act, state.goActive(asgn));
+
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ public void testGoActive_WithoutAssignment() {
+ State act = mock(State.class);
+ State inact = mock(State.class);
+
+ when(mgr.goActive()).thenReturn(act);
+ when(mgr.goInactive()).thenReturn(inact);
+
+ String[] arr = {HOST2, PREV_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertEquals(inact, state.goActive(asgn));
+
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ public void testGoActive_NullAssignment() {
+ State act = mock(State.class);
+ State inact = mock(State.class);
+
+ when(mgr.goActive()).thenReturn(act);
+ when(mgr.goInactive()).thenReturn(inact);
+
+ assertEquals(inact, state.goActive(null));
+
+ verify(mgr, never()).startDistributing(any());
}
@Test
@@ -375,6 +411,20 @@ public class StateTest extends BasicStateTester {
}
@Test
+ public void testMissedHeartbeat() {
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ State next2 = state.missedHeartbeat();
+ assertEquals(next, next2);
+
+ verify(mgr).startDistributing(null);
+
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
public void testInternalTopicFailed() {
State next = mock(State.class);
when(mgr.goInactive()).thenReturn(next);
@@ -398,6 +448,13 @@ public class StateTest extends BasicStateTester {
}
@Test
+ public void testMakeIdentification() {
+ Identification ident = state.makeIdentification();
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
public void testMakeOffline() {
Offline msg = state.makeOffline();