diff options
Diffstat (limited to 'feature-pooling-dmaap')
25 files changed, 1286 insertions, 413 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index 102eda75..abb4da33 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -202,6 +202,7 @@ public class DmaapManager { } catch (InterruptedException e) { logger.warn("message transmission stopped due to {}", e.getMessage()); + Thread.currentThread().interrupt(); } try { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index 21cbc4db..1e2071ab 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -21,7 +21,9 @@ package org.onap.policy.drools.pooling; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; @@ -54,6 +56,11 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF private static Factory factory; /** + * ID of this host. + */ + private final String host; + + /** * Entire set of feature properties, including those specific to various controllers. */ private Properties featProps = null; @@ -61,7 +68,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Maps a controller name to its associated manager. */ - private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107); + private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107); + + /** + * Decremented each time a manager enters the Active state. Used by junit tests. + */ + private final CountDownLatch activeLatch = new CountDownLatch(1); /** * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is @@ -75,6 +87,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF */ public PoolingFeature() { super(); + + this.host = UUID.randomUUID().toString(); } protected static Factory getFactory() { @@ -90,6 +104,17 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF PoolingFeature.factory = factory; } + public String getHost() { + return host; + } + + /** + * @return a latch that will be decremented when a manager enters the active state + */ + protected CountDownLatch getActiveLatch() { + return activeLatch; + } + @Override public int getSequenceNumber() { return 0; @@ -123,7 +148,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF PoolingProperties props = new PoolingProperties(name, featProps); logger.info("pooling enabled for {}", name); - ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props)); + ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch)); } catch (PropertyException e) { logger.error("pooling disabled due to exception for {}", name, e); @@ -371,12 +396,15 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Makes a pooling manager for a controller. * + * @param host name/uuid of this host * @param controller * @param props properties to use to configure the manager + * @param activeLatch decremented when the manager goes Active * @return a new pooling manager */ - public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) { - return new PoolingManagerImpl(controller, props); + public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { + return new PoolingManagerImpl(host, controller, props, activeLatch); } /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index 422efdd7..de25e471 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -25,12 +25,10 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; import java.util.Properties; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.event.comm.TopicListener; @@ -47,6 +45,7 @@ import org.onap.policy.drools.pooling.state.QueryState; import org.onap.policy.drools.pooling.state.StartState; import org.onap.policy.drools.pooling.state.State; import org.onap.policy.drools.pooling.state.StateTimerTask; +import org.onap.policy.drools.properties.PolicyProperties; import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.system.PolicyController; import org.slf4j.Logger; @@ -75,11 +74,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private static Factory factory = new Factory(); /** - * ID of the last host that was created. - */ - private static final AtomicReference<String> lastHost = new AtomicReference<>(null); - - /** * ID of this host. */ private final String host; @@ -100,6 +94,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final TopicListener listener; /** + * Decremented each time the manager enters the Active state. Used by junit tests. + */ + private final CountDownLatch activeLatch; + + /** * Used to encode & decode request objects received from & sent to a rule engine. */ private final Serializer serializer; @@ -148,7 +147,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Queue used when no bucket assignments are available. */ - private EventQueue eventq; + private final EventQueue eventq; /** * {@code True} if events offered by the controller should be intercepted, @@ -158,28 +157,28 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Constructs the manager, initializing all of the data structures. - * + * + * @param host name/uuid of this host * @param controller controller with which this is associated * @param props feature properties specific to the controller + * @param activeLatch latch to be decremented each time the manager enters the Active + * state */ - public PoolingManagerImpl(PolicyController controller, PoolingProperties props) { - this.host = UUID.randomUUID().toString(); + public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { + this.host = host; this.controller = controller; this.props = props; - - lastHost.set(this.host); + this.activeLatch = activeLatch; try { this.listener = (TopicListener) controller; this.serializer = new Serializer(); this.topic = props.getPoolingTopic(); this.eventq = factory.makeEventQueue(props); - - SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), - props.getSource()); - this.extractors = factory.makeClassExtractors(spec); - - this.dmaapMgr = factory.makeDmaapManager(props); + this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource())); + this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(), + makeDmaapProps(controller, props.getSource())); this.current = new IdleState(this); logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic); @@ -203,16 +202,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Used by junit tests. - * - * @return the ID of the last host that was created, or {@code null} if no hosts have - * been created yet - */ - protected static String getLastHost() { - return lastHost.get(); - } - - /** * Should only be used by junit tests. * * @return the current state @@ -237,6 +226,50 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** + * Makes properties for configuring extractors. + * + * @param controller the controller for which the extractors will be configured + * @param source properties from which to get the extractor properties + * @return extractor properties + */ + private Properties makeExtractorProps(PolicyController controller, Properties source) { + return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source); + } + + /** + * Makes properties for configuring DMaaP. Copies properties from the source that + * start with the Pooling property prefix followed by the controller name, stripping + * the prefix and controller name. + * + * @param controller the controller for which DMaaP will be configured + * @param source properties from which to get the DMaaP properties + * @return DMaaP properties + */ + private Properties makeDmaapProps(PolicyController controller, Properties source) { + SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source); + + // could be UEB or DMAAP, so add both + addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); + addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); + + return specProps; + } + + /** + * Adds DMaaP consumer properties, consumer group & instance. The group is the host + * and the instance is a constant. + * + * @param props where to add the new properties + * @param prefix property prefix + */ + private void addDmaapConsumerProps(SpecProperties props, String prefix) { + String fullpfx = props.getSpecPrefix() + prefix + "." + topic; + + props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host); + props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0"); + } + + /** * Indicates that the controller is about to start. Starts the publisher for the * internal topic, and creates a thread pool for the timers. * @@ -288,8 +321,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { scheduler = null; if (!(current instanceof IdleState)) { - dmaapMgr.stopConsumer(this); changeState(new IdleState(this)); + dmaapMgr.stopConsumer(this); publishAdmin(new Offline(getHost())); } } @@ -405,12 +438,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); // wrap the future in a "CancellableScheduledTask" - return new CancellableScheduledTask() { - @Override - public void cancel() { - fut.cancel(false); - } - }; + return () -> fut.cancel(false); } @Override @@ -420,12 +448,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { TimeUnit.MILLISECONDS); // wrap the future in a "CancellableScheduledTask" - return new CancellableScheduledTask() { - @Override - public void cancel() { - fut.cancel(false); - } - }; + return () -> fut.cancel(false); } @Override @@ -609,7 +632,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { topic); } else { - logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); + logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); event.bumpNumHops(); publish(target, event); } @@ -751,26 +774,27 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { @Override public CountDownLatch startDistributing(BucketAssignments asgn) { - if (asgn == null) { - return null; - } - - logger.info("new assignments for topic {}", getTopic()); - synchronized (curLocker) { + int sz = (asgn == null ? 0 : asgn.getAllHosts().size()); + logger.info("new assignments for {} hosts on topic {}", sz, getTopic()); assignments = asgn; } + if (asgn == null) { + return null; + } + /* * publish the events from the event queue, but do it in a background thread so * that the state machine can enter its correct state BEFORE we start processing * the events */ CountDownLatch latch = new CountDownLatch(1); - + new Thread(() -> { synchronized (curLocker) { if (assignments == null) { + latch.countDown(); return; } @@ -779,11 +803,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { while ((ev = eventq.poll()) != null) { handle(ev); } - + latch.countDown(); } }).start(); - + return latch; } @@ -804,6 +828,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { @Override public State goActive() { + activeLatch.countDown(); return new ActiveState(this); } @@ -876,12 +901,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Creates a DMaaP manager. * - * @param props properties used to configure the manager + * @param topic name of the internal DMaaP topic + * @param props properties used to configure DMaaP * @return a new DMaaP manager * @throws PoolingFeatureException if an error occurs */ - public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException { - return new DmaapManager(props.getPoolingTopic(), props.getSource()); + public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException { + return new DmaapManager(topic, props); } /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java index 4e8de0d0..54319423 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java @@ -51,12 +51,12 @@ public class PoolingProperties extends SpecPropertyConfiguration { public static final String FEATURE_ENABLED = PREFIX + "{?.}enabled"; public static final String OFFLINE_LIMIT = PREFIX + "{?.}offline.queue.limit"; public static final String OFFLINE_AGE_MS = PREFIX + "{?.}offline.queue.age.milliseconds"; + public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds"; public static final String START_HEARTBEAT_MS = PREFIX + "{?.}start.heartbeat.milliseconds"; public static final String REACTIVATE_MS = PREFIX + "{?.}reactivate.milliseconds"; public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds"; public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds"; public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds"; - public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds"; /** * Type of item that the extractors will be extracting. @@ -94,10 +94,17 @@ public class PoolingProperties extends SpecPropertyConfiguration { private long offlineAgeMs; /** + * Time, in milliseconds, to wait for an "Offline" message to be published + * to DMaaP. + */ + @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000") + private long offlinePubWaitMs; + + /** * Time, in milliseconds, to wait for this host's heart beat during the * start-up state. */ - @Property(name = START_HEARTBEAT_MS, defaultValue = "50000") + @Property(name = START_HEARTBEAT_MS, defaultValue = "100000") private long startHeartbeatMs; /** @@ -123,19 +130,12 @@ public class PoolingProperties extends SpecPropertyConfiguration { /** * Time, in milliseconds, to wait between heart beat generations during - * the active state. + * the active and start-up states. */ @Property(name = INTER_HEARTBEAT_MS, defaultValue = "15000") private long interHeartbeatMs; /** - * Time, in milliseconds, to wait for an "Offline" message to be published - * to DMaaP. - */ - @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000") - private long offlinePubWaitMs; - - /** * @param controllerName the name of the controller * @param props set of properties used to configure this * @throws PropertyException if an error occurs @@ -163,6 +163,10 @@ public class PoolingProperties extends SpecPropertyConfiguration { return offlineAgeMs; } + public long getOfflinePubWaitMs() { + return offlinePubWaitMs; + } + public long getStartHeartbeatMs() { return startHeartbeatMs; } @@ -182,8 +186,4 @@ public class PoolingProperties extends SpecPropertyConfiguration { public long getInterHeartbeatMs() { return interHeartbeatMs; } - - public long getOfflinePubWaitMs() { - return offlinePubWaitMs; - } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java index 31b4e614..c831f706 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java @@ -40,7 +40,8 @@ public class SpecProperties extends Properties { /** * - * @param prefix the property name prefix that appears before any specialization + * @param prefix the property name prefix that appears before any specialization, may + * be "" * @param specialization the property name specialization (e.g., session name) */ public SpecProperties(String prefix, String specialization) { @@ -50,7 +51,8 @@ public class SpecProperties extends Properties { /** * - * @param prefix the property name prefix that appears before any specialization + * @param prefix the property name prefix that appears before any specialization, may + * be "" * @param specialization the property name specialization (e.g., session name) * @param props the default properties */ @@ -68,7 +70,7 @@ public class SpecProperties extends Properties { * @return the text, with a trailing "." */ private static String withTrailingDot(String text) { - return text.endsWith(".") ? text : text + "."; + return text.isEmpty() || text.endsWith(".") ? text : text + "."; } /** @@ -105,11 +107,11 @@ public class SpecProperties extends Properties { @Override public final int hashCode() { - throw new UnsupportedOperationException("HostBucket cannot be hashed"); + throw new UnsupportedOperationException("SpecProperties cannot be hashed"); } @Override public final boolean equals(Object obj) { - throw new UnsupportedOperationException("cannot compare HostBuckets"); + throw new UnsupportedOperationException("cannot compare SpecProperties"); } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java index 2752ca8c..ccca77be 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java @@ -374,7 +374,7 @@ public class ClassExtractors { } catch (NoSuchMethodException expected) { // no getXxx() method, maybe there's a field by this name - logger.debug("no method {} in {}", nm, clazz.getName()); + logger.debug("no method {} in {}", nm, clazz.getName(), expected); return null; } catch (SecurityException e) { @@ -445,7 +445,7 @@ public class ClassExtractors { } catch (NoSuchFieldException expected) { // no field by this name - try super class & interfaces - logger.debug("no field {} in {}", name, clazz.getName()); + logger.debug("no field {} in {}", name, clazz.getName(), expected); } catch (SecurityException e) { throw new ExtractorException("inaccessible field " + clazz + "." + name, e); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java index b0a36cd9..8f0a902a 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java @@ -95,18 +95,19 @@ public class ActiveState extends ProcessingState { if ((succHost = assigned.higher(getHost())) == null) { // wrapped around - successor is the first host in the set succHost = assigned.first(); - logger.info("this host's successor is {} on topic {}", succHost, getTopic()); } + logger.info("this host's successor is {} on topic {}", succHost, getTopic()); if ((predHost = assigned.lower(getHost())) == null) { // wrapped around - predecessor is the last host in the set predHost = assigned.last(); - logger.info("this host's predecessor is {} on topic {}", predHost, getTopic()); } + logger.info("this host's predecessor is {} on topic {}", predHost, getTopic()); } @Override public void start() { + super.start(); addTimers(); genHeartbeat(); } @@ -120,7 +121,7 @@ public class ActiveState extends ProcessingState { /* * heart beat generator */ - long genMs = getProperties().getActiveHeartbeatMs(); + long genMs = getProperties().getInterHeartbeatMs(); scheduleWithFixedDelay(genMs, genMs, () -> { genHeartbeat(); @@ -130,9 +131,9 @@ public class ActiveState extends ProcessingState { /* * my heart beat checker */ - long interMs = getProperties().getInterHeartbeatMs(); + long waitMs = getProperties().getActiveHeartbeatMs(); - scheduleWithFixedDelay(interMs, interMs, () -> { + scheduleWithFixedDelay(waitMs, waitMs, () -> { if (myHeartbeatSeen) { myHeartbeatSeen = false; return null; @@ -141,7 +142,7 @@ public class ActiveState extends ProcessingState { // missed my heart beat logger.error("missed my heartbeat on topic {}", getTopic()); - return internalTopicFailed(); + return missedHeartbeat(); }); /* @@ -149,7 +150,7 @@ public class ActiveState extends ProcessingState { */ if (!predHost.isEmpty()) { - scheduleWithFixedDelay(interMs, interMs, () -> { + scheduleWithFixedDelay(waitMs, waitMs, () -> { if (predHeartbeatSeen) { predHeartbeatSeen = false; return null; diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java index 6be2fb84..f717aa52 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java @@ -44,24 +44,17 @@ public class InactiveState extends State { @Override public void start() { - super.start(); - - schedule(getProperties().getReactivateMs(), () -> goStart()); + schedule(getProperties().getReactivateMs(), this::goStart); } @Override public State process(Leader msg) { - if(isValid(msg)) { + if (isValid(msg)) { logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic()); - startDistributing(msg.getAssignments()); - - if(msg.getAssignments().hasAssignment(getHost())) { - logger.info("received Leader message on topic {}", getTopic()); - return goActive(); - } + return goActive(msg.getAssignments()); } - + return null; } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java index 1e9bb581..e9dc0324 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java @@ -74,24 +74,6 @@ public class ProcessingState extends State { } /** - * Goes active with a new set of assignments. - * - * @param asgn new assignments - * @return the new state, either Active or Inactive, depending on whether or not this - * host has an assignment - */ - protected State goActive(BucketAssignments asgn) { - startDistributing(asgn); - - if (asgn.hasAssignment(getHost())) { - return goActive(); - - } else { - return goInactive(); - } - } - - /** * Generates an Identification message and goes to the query state. */ @Override @@ -154,11 +136,11 @@ public class ProcessingState extends State { } Leader msg = makeLeader(alive); - publish(msg); + logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size()); - setAssignments(msg.getAssignments()); + publish(msg); - return goActive(); + return goActive(msg.getAssignments()); } /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java index 9045165b..1a4da150 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java @@ -63,7 +63,6 @@ public class QueryState extends ProcessingState { @Override public void start() { - super.start(); // start identification timer @@ -85,39 +84,21 @@ public class QueryState extends ProcessingState { if (!sawSelfIdent) { // didn't see our identification logger.error("missed our own Ident message on topic {}", getTopic()); - return internalTopicFailed(); + return missedHeartbeat(); } else if (isLeader()) { // "this" host is the new leader logger.info("this host is the new leader for topic {}", getTopic()); return becomeLeader(alive); - } else if (hasAssignment()) { - /* - * this host is not the new leader, but it does have an assignment - - * return to the active state while we wait for the leader - */ - logger.info("no new leader on topic {}", getTopic()); - return goActive(); - } else { - // not the leader and no assignment yet + // not the leader - return to previous state logger.info("no new leader on topic {}", getTopic()); - return goInactive(); + return goActive(getAssignments()); } }); } - /** - * Determines if this host has an assignment in the CURRENT assignments. - * - * @return {@code true} if this host has an assignment, {@code false} otherwise - */ - protected boolean hasAssignment() { - BucketAssignments asgn = getAssignments(); - return (asgn != null && asgn.hasAssignment(getHost())); - } - @Override public State goQuery() { return null; diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java index a978e245..3068cfc9 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java @@ -67,8 +67,22 @@ public class StartState extends State { super.start(); - publish(getHost(), makeHeartbeat(hbTimestampMs)); + Heartbeat hb = makeHeartbeat(hbTimestampMs); + publish(getHost(), hb); + /* + * heart beat generator + */ + long genMs = getProperties().getInterHeartbeatMs(); + + scheduleWithFixedDelay(genMs, genMs, () -> { + publish(getHost(), hb); + return null; + }); + + /* + * my heart beat checker + */ schedule(getProperties().getStartHeartbeatMs(), () -> { logger.error("missed heartbeat on topic {}", getTopic()); return internalTopicFailed(); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index 421b9225..545c2ef0 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -113,12 +113,21 @@ public abstract class State { } /** - * Transitions to the "active" state. + * Goes active with a new set of assignments. * - * @return the new state + * @param asgn new assignments + * @return the new state, either Active or Inactive, depending on whether or not this + * host has an assignment */ - public final State goActive() { - return mgr.goActive(); + protected State goActive(BucketAssignments asgn) { + startDistributing(asgn); + + if (asgn != null && asgn.hasAssignment(getHost())) { + return mgr.goActive(); + + } else { + return goInactive(); + } } /** @@ -322,7 +331,21 @@ public abstract class State { } /** - * Indicates that the internal topic failed. + * Indicates that we failed to see our own heartbeat; must be a problem with the + * internal topic. + * + * @return a new {@link StartState} + */ + protected final State missedHeartbeat() { + publish(makeOffline()); + mgr.startDistributing(null); + + return mgr.goStart(); + } + + /** + * Indicates that the internal topic failed; this should only be invoked from the + * StartState. * * @return a new {@link InactiveState} */ diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java index a91671fd..a5688df6 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.LinkedList; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -211,8 +212,8 @@ public class DmaapManagerTest { // force exception when it starts doThrow(new IllegalStateException("expected")).when(sink).start(); - expectException("startPublisher,start", xxx -> mgr.startPublisher()); - expectException("startPublisher,publish", xxx -> mgr.publish(MSG)); + expectException("startPublisher,start", () -> mgr.startPublisher()); + expectException("startPublisher,publish", () -> mgr.publish(MSG)); // allow it to succeed this time reset(sink); @@ -264,11 +265,15 @@ public class DmaapManagerTest { long minms = 2000L; // tell the publisher to stop in minms + additional time - Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L)); + CountDownLatch latch = new CountDownLatch(1); + Thread thread = new Thread(() -> { + latch.countDown(); + mgr.stopPublisher(minms + 3000L); + }); thread.start(); - // give the thread a chance to start - Thread.sleep(50L); + // wait for the thread to start + latch.await(); // interrupt it - it should immediately finish its work thread.interrupt(); @@ -336,7 +341,7 @@ public class DmaapManagerTest { @Test public void testPublish() throws PoolingFeatureException { // cannot publish before starting - expectException("publish,pre", xxx -> mgr.publish(MSG)); + expectException("publish,pre", () -> mgr.publish(MSG)); mgr.startPublisher(); @@ -352,7 +357,7 @@ public class DmaapManagerTest { // stop and verify we can no longer publish mgr.stopPublisher(0); - expectException("publish,stopped", xxx -> mgr.publish(MSG)); + expectException("publish,stopped", () -> mgr.publish(MSG)); } @Test(expected = PoolingFeatureException.class) @@ -377,7 +382,7 @@ public class DmaapManagerTest { private void expectException(String testnm, VFunction func) { try { - func.apply(null); + func.apply(); fail(testnm + " missing exception"); } catch (PoolingFeatureException expected) { @@ -387,6 +392,6 @@ public class DmaapManagerTest { @FunctionalInterface public static interface VFunction { - public void apply(Void arg) throws PoolingFeatureException; + public void apply() throws PoolingFeatureException; } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java index 13d70f52..cc588384 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java @@ -26,13 +26,13 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize; import java.io.IOException; import java.util.Arrays; import java.util.Deque; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map.Entry; import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; @@ -43,13 +43,11 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -71,7 +69,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; /** * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having - * its own feature object. + * its own feature object. Uses real feature objects. However, the following are not: + * <dl> + * <dt>DMaaP sources and sinks</dt> + * <dd>simulated using queues. There is one queue for the external topic, and one queue + * for each host's internal topic. Messages published to the "admin" channel are simply + * sent to all of the hosts' internal topic queues</dd> + * <dt>PolicyEngine, PolicyController, DroolsController</dt> + * <dd>mocked</dd> + * </dl> */ public class FeatureTest { @@ -92,26 +98,15 @@ public class FeatureTest { */ private static final String CONTROLLER1 = "controller.one"; - // private static final long STD_HEARTBEAT_WAIT_MS = 100; - // private static final long STD_REACTIVATE_WAIT_MS = 200; - // private static final long STD_IDENTIFICATION_MS = 60; - // private static final long STD_ACTIVE_HEARTBEAT_MS = 5; - // private static final long STD_INTER_HEARTBEAT_MS = 50; - // private static final long STD_OFFLINE_PUB_WAIT_MS = 2; - // private static final long POLL_MS = 2; - // private static final long INTER_POLL_MS = 2; - // private static final long EVENT_WAIT_SEC = 5; - - // use these to slow things down - private static final long STD_HEARTBEAT_WAIT_MS = 5000; - private static final long STD_REACTIVATE_WAIT_MS = 10000; - private static final long STD_IDENTIFICATION_MS = 10000; - private static final long STD_ACTIVE_HEARTBEAT_MS = 5000; - private static final long STD_INTER_HEARTBEAT_MS = 12000; - private static final long STD_OFFLINE_PUB_WAIT_MS = 2; - private static final long POLL_MS = 2; - private static final long INTER_POLL_MS = 2000; - private static final long EVENT_WAIT_SEC = 1000; + private static long stdReactivateWaitMs = 200; + private static long stdIdentificationMs = 60; + private static long stdStartHeartbeatMs = 60; + private static long stdActiveHeartbeatMs = 50; + private static long stdInterHeartbeatMs = 5; + private static long stdOfflinePubWaitMs = 2; + private static long stdPollMs = 2; + private static long stdInterPollMs = 2; + private static long stdEventWaitSec = 10; // these are saved and restored on exit from this test class private static PoolingFeature.Factory saveFeatureFactory; @@ -128,6 +123,8 @@ public class FeatureTest { saveFeatureFactory = PoolingFeature.getFactory(); saveManagerFactory = PoolingManagerImpl.getFactory(); saveDmaapFactory = DmaapManager.getFactory(); + + // note: invoke runSlow() to slow things down } @AfterClass @@ -149,51 +146,35 @@ public class FeatureTest { } } - @Ignore @Test public void test_SingleHost() throws Exception { - int nmessages = 70; - - ctx = new Context(nmessages); - - ctx.addHost(); - ctx.startHosts(); - - for (int x = 0; x < nmessages; ++x) { - ctx.offerExternal(makeMessage(x)); - } - - ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS); - - assertEquals(0, ctx.getDecodeErrors()); - assertEquals(0, ctx.getRemainingEvents()); - ctx.checkAllSawAMsg(); + run(70, 1); } - @Ignore @Test public void test_TwoHosts() throws Exception { - int nmessages = 200; + run(200, 2); + } + + @Test + public void test_ThreeHosts() throws Exception { + run(200, 3); + } + private void run(int nmessages, int nhosts) throws Exception { ctx = new Context(nmessages); - ctx.addHost(); - ctx.addHost(); + for (int x = 0; x < nhosts; ++x) { + ctx.addHost(); + } + ctx.startHosts(); for (int x = 0; x < nmessages; ++x) { ctx.offerExternal(makeMessage(x)); } - // wait for all hosts to have time to process a few messages - Thread.sleep(STD_ACTIVE_HEARTBEAT_MS + INTER_POLL_MS * 3); - - // pause a topic for a bit -// ctx.pauseTopic(); - - // now we'll see if it recovers - - ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS); + ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS); assertEquals(0, ctx.getDecodeErrors()); assertEquals(0, ctx.getRemainingEvents()); @@ -203,6 +184,21 @@ public class FeatureTest { private String makeMessage(int reqnum) { return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}"; } + + /** + * Invoke this to slow the timers down. + */ + protected static void runSlow() { + stdReactivateWaitMs = 10000; + stdIdentificationMs = 10000; + stdStartHeartbeatMs = 15000; + stdActiveHeartbeatMs = 12000; + stdInterHeartbeatMs = 5000; + stdOfflinePubWaitMs = 2; + stdPollMs = 2; + stdInterPollMs = 2000; + stdEventWaitSec = 1000; + } /** * Context used for a single test case. @@ -244,12 +240,6 @@ public class FeatureTest { private final CountDownLatch eventCounter; /** - * Maps host name to its topic source. This must be in sorted order so we can - * identify the source for the host with the higher name. - */ - private TreeMap<String, TopicSourceImpl> host2topic = new TreeMap<>(); - - /** * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by * {@link #getCurrentHost()}. */ @@ -280,9 +270,14 @@ public class FeatureTest { /** * Creates and adds a new host to the context. + * + * @return the new Host */ - public void addHost() { - hosts.add(new Host(this)); + public Host addHost() { + Host host = new Host(this); + hosts.add(host); + + return host; } /** @@ -443,26 +438,6 @@ public class FeatureTest { } /** - * Associates a host with a topic. - * - * @param host - * @param topic - */ - public void addTopicSource(String host, TopicSourceImpl topic) { - host2topic.put(host, topic); - } - - /** - * Pauses the last topic source long enough to miss a heart beat. - */ - public void pauseTopic() { - Entry<String, TopicSourceImpl> ent = host2topic.lastEntry(); - if (ent != null) { - ent.getValue().pause(STD_ACTIVE_HEARTBEAT_MS); - } - } - - /** * Gets the current host, provided this is used from within a call to * {@link #withHost(Host, VoidFunction)}. * @@ -527,12 +502,10 @@ public class FeatureTest { } /** - * Gets the host name. This should only be invoked within {@link #start()}. - * * @return the host name */ public String getName() { - return PoolingManagerImpl.getLastHost(); + return feature.getHost(); } /** @@ -759,11 +732,6 @@ public class FeatureTest { private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null); /** - * Time, in milliseconds, to pause before polling for more messages. - */ - private AtomicLong pauseTimeMs = new AtomicLong(0); - - /** * * @param context * @param internal {@code true} if to read from the internal topic, {@code false} @@ -771,12 +739,8 @@ public class FeatureTest { */ public TopicSourceImpl(Context context, boolean internal) { if (internal) { - Host host = context.getCurrentHost(); - this.topic = INTERNAL_TOPIC; - this.queue = host.getInternalQueue(); - - context.addTopicSource(host.getName(), this); + this.queue = context.getCurrentHost().getInternalQueue(); } else { this.topic = EXTERNAL_TOPIC; @@ -809,11 +773,12 @@ public class FeatureTest { reregister(newPair); - new Thread(() -> { + Thread thread = new Thread(() -> { + try { do { processMessages(newPair.first(), listener); - } while (!newPair.first().await(INTER_POLL_MS, TimeUnit.MILLISECONDS)); + } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS)); logger.info("topic source thread completed"); @@ -827,7 +792,10 @@ public class FeatureTest { newPair.second().countDown(); - }).start(); + }); + + thread.setDaemon(true); + thread.start(); } /** @@ -879,19 +847,7 @@ public class FeatureTest { } /** - * Indicates that {@link #processMessages(CountDownLatch, TopicListener)} should - * pause a bit. - * - * @param timeMs time, in milliseconds, to pause - */ - public void pause(long timeMs) { - pauseTimeMs.set(timeMs); - } - - /** - * Polls for messages from the topic and offers them to the listener. If - * {@link #pauseTimeMs} is non-zero, then it pauses for the specified time and - * then immediately returns. + * Polls for messages from the topic and offers them to the listener. * * @param stopped triggered if processing should stop * @param listener @@ -901,14 +857,7 @@ public class FeatureTest { for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) { - long ptm = pauseTimeMs.getAndSet(0); - if (ptm != 0) { - logger.warn("pause processing"); - stopped.await(ptm, TimeUnit.MILLISECONDS); - return; - } - - String msg = queue.poll(POLL_MS, TimeUnit.MILLISECONDS); + String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS); if (msg == null) { return; } @@ -1038,18 +987,20 @@ public class FeatureTest { props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}"); - props.setProperty("pooling." + CONTROLLER1 + ".topic", INTERNAL_TOPIC); - props.setProperty("pooling." + CONTROLLER1 + ".enabled", "true"); - props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.limit", "10000"); - props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.age.milliseconds", "1000000"); - props.setProperty("pooling." + CONTROLLER1 + ".start.heartbeat.milliseconds", "" + STD_HEARTBEAT_WAIT_MS); - props.setProperty("pooling." + CONTROLLER1 + ".reactivate.milliseconds", "" + STD_REACTIVATE_WAIT_MS); - props.setProperty("pooling." + CONTROLLER1 + ".identification.milliseconds", "" + STD_IDENTIFICATION_MS); - props.setProperty("pooling." + CONTROLLER1 + ".active.heartbeat.milliseconds", - "" + STD_ACTIVE_HEARTBEAT_MS); - props.setProperty("pooling." + CONTROLLER1 + ".inter.heartbeat.milliseconds", "" + STD_INTER_HEARTBEAT_MS); - props.setProperty("pooling." + CONTROLLER1 + ".offline.publish.wait.milliseconds", - "" + STD_OFFLINE_PUB_WAIT_MS); + props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true"); + props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC); + props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000"); + props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000"); + props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1), + "" + stdOfflinePubWaitMs); + props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1), + "" + stdStartHeartbeatMs); + props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs); + props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs); + props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1), + "" + stdActiveHeartbeatMs); + props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), + "" + stdInterHeartbeatMs); return props; } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java new file mode 100644 index 00000000..6884bec8 --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java @@ -0,0 +1,734 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize; +import java.io.IOException; +import java.util.Deque; +import java.util.IdentityHashMap; +import java.util.LinkedList; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; +import org.onap.policy.drools.event.comm.TopicEndpoint; +import org.onap.policy.drools.event.comm.TopicListener; +import org.onap.policy.drools.event.comm.TopicSink; +import org.onap.policy.drools.event.comm.TopicSource; +import org.onap.policy.drools.properties.PolicyProperties; +import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having + * its own feature object. Uses real feature objects, as well as real DMaaP sources and + * sinks. However, the following are not: + * <dl> + * <dt>PolicyEngine, PolicyController, DroolsController</dt> + * <dd>mocked</dd> + * </dl> + * + * <p> + * The following fields must be set before executing this: + * <ul> + * <li>UEB_SERVERS</li> + * <li>INTERNAL_TOPIC</li> + * <li>EXTERNAL_TOPIC</li> + * </ul> + */ +public class FeatureTest2 { + + private static final Logger logger = LoggerFactory.getLogger(FeatureTest2.class); + + /** + * UEB servers for both internal & external topics. + */ + private static final String UEB_SERVERS = ""; + + /** + * Name of the topic used for inter-host communication. + */ + private static final String INTERNAL_TOPIC = ""; + + /** + * Name of the topic from which "external" events "arrive". + */ + private static final String EXTERNAL_TOPIC = ""; + + /** + * Consumer group to use when polling the external topic. + */ + private static final String EXTERNAL_GROUP = FeatureTest2.class.getName(); + + /** + * Name of the controller. + */ + private static final String CONTROLLER1 = "controller.one"; + + /** + * Maximum number of items to fetch from DMaaP in a single poll. + */ + private static final String FETCH_LIMIT = "5"; + + private static final long STD_REACTIVATE_WAIT_MS = 10000; + private static final long STD_IDENTIFICATION_MS = 10000; + private static final long STD_START_HEARTBEAT_MS = 15000; + private static final long STD_ACTIVE_HEARTBEAT_MS = 12000; + private static final long STD_INTER_HEARTBEAT_MS = 5000; + private static final long STD_OFFLINE_PUB_WAIT_MS = 2; + private static final long EVENT_WAIT_SEC = 15; + + // these are saved and restored on exit from this test class + private static PoolingFeature.Factory saveFeatureFactory; + private static PoolingManagerImpl.Factory saveManagerFactory; + + /** + * Sink for external DMaaP topic. + */ + private static TopicSink externalSink; + + /** + * Context for the current test case. + */ + private Context ctx; + + + @BeforeClass + public static void setUpBeforeClass() { + saveFeatureFactory = PoolingFeature.getFactory(); + saveManagerFactory = PoolingManagerImpl.getFactory(); + + Properties props = makeSinkProperties(EXTERNAL_TOPIC); + externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0); + externalSink.start(); + } + + @AfterClass + public static void tearDownAfterClass() { + PoolingFeature.setFactory(saveFeatureFactory); + PoolingManagerImpl.setFactory(saveManagerFactory); + + externalSink.stop(); + } + + @Before + public void setUp() { + ctx = null; + } + + @After + public void tearDown() { + if (ctx != null) { + ctx.destroy(); + } + } + + @Ignore + @Test + public void test_SingleHost() throws Exception { + run(70, 1); + } + + @Ignore + @Test + public void test_TwoHosts() throws Exception { + run(200, 2); + } + + @Ignore + @Test + public void test_ThreeHosts() throws Exception { + run(200, 3); + } + + private void run(int nmessages, int nhosts) throws Exception { + ctx = new Context(nmessages); + + for (int x = 0; x < nhosts; ++x) { + ctx.addHost(); + } + + ctx.startHosts(); + ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2); + + for (int x = 0; x < nmessages; ++x) { + ctx.offerExternal(makeMessage(x)); + } + + ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS); + + assertEquals(0, ctx.getDecodeErrors()); + assertEquals(0, ctx.getRemainingEvents()); + ctx.checkAllSawAMsg(); + } + + private String makeMessage(int reqnum) { + return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}"; + } + + private static Properties makeSinkProperties(String topic) { + Properties props = new Properties(); + + props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS, topic); + + props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS); + props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0"); + props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false"); + + return props; + } + + private static Properties makeSourceProperties(String topic) { + Properties props = new Properties(); + + props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS, topic); + + props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS); + props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT); + props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false"); + + if (EXTERNAL_TOPIC.equals(topic)) { + // consumer group is a constant + props.setProperty( + PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, + EXTERNAL_GROUP); + + // consumer instance is generated by the BusConsumer code + } + + // else internal topic: feature populates info for internal topic + + return props; + } + + /** + * Context used for a single test case. + */ + private static class Context { + + private final FeatureFactory featureFactory; + private final ManagerFactory managerFactory; + + /** + * Hosts that have been added to this context. + */ + private final Deque<Host> hosts = new LinkedList<>(); + + /** + * Maps a drools controller to its policy controller. + */ + private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>(); + + /** + * Counts the number of decode errors. + */ + private final AtomicInteger nDecodeErrors = new AtomicInteger(0); + + /** + * Number of events we're still waiting to receive. + */ + private final CountDownLatch eventCounter; + + /** + * + * @param nEvents number of events to be processed + */ + public Context(int nEvents) { + featureFactory = new FeatureFactory(this); + managerFactory = new ManagerFactory(this); + eventCounter = new CountDownLatch(nEvents); + + PoolingFeature.setFactory(featureFactory); + PoolingManagerImpl.setFactory(managerFactory); + } + + /** + * Destroys the context, stopping any hosts that remain. + */ + public void destroy() { + stopHosts(); + hosts.clear(); + } + + /** + * Creates and adds a new host to the context. + * + * @return the new Host + */ + public Host addHost() { + Host host = new Host(this); + hosts.add(host); + + return host; + } + + /** + * Starts the hosts. + */ + public void startHosts() { + hosts.forEach(host -> host.start()); + } + + /** + * Stops the hosts. + */ + public void stopHosts() { + hosts.forEach(host -> host.stop()); + } + + /** + * Verifies that all hosts processed at least one message. + */ + public void checkAllSawAMsg() { + int x = 0; + for (Host host : hosts) { + assertTrue("x=" + x, host.messageSeen()); + ++x; + } + } + + /** + * Offers an event to the external topic. + * + * @param event + */ + public void offerExternal(String event) { + externalSink.send(event); + } + + /** + * Decodes an event. + * + * @param event + * @return the decoded event, or {@code null} if it cannot be decoded + */ + public Object decodeEvent(String event) { + return managerFactory.decodeEvent(null, null, event); + } + + /** + * Associates a controller with its drools controller. + * + * @param controller + * @param droolsController + */ + public void addController(PolicyController controller, DroolsController droolsController) { + drools2policy.put(droolsController, controller); + } + + /** + * @param droolsController + * @return the controller associated with a drools controller, or {@code null} if + * it has no associated controller + */ + public PolicyController getController(DroolsController droolsController) { + return drools2policy.get(droolsController); + } + + /** + * + * @return the number of decode errors so far + */ + public int getDecodeErrors() { + return nDecodeErrors.get(); + } + + /** + * Increments the count of decode errors. + */ + public void bumpDecodeErrors() { + nDecodeErrors.incrementAndGet(); + } + + /** + * + * @return the number of events that haven't been processed + */ + public long getRemainingEvents() { + return eventCounter.getCount(); + } + + /** + * Adds an event to the counter. + */ + public void addEvent() { + eventCounter.countDown(); + } + + /** + * Waits, for a period of time, for all events to be processed. + * + * @param time + * @param units + * @return {@code true} if all events have been processed, {@code false} otherwise + * @throws InterruptedException + */ + public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException { + return eventCounter.await(time, units); + } + + /** + * Waits, for a period of time, for all hosts to enter the Active state. + * + * @param timeMs maximum time to wait, in milliseconds + * @throws InterruptedException + */ + public void awaitAllActive(long timeMs) throws InterruptedException { + long tend = timeMs + System.currentTimeMillis(); + + for (Host host : hosts) { + long tremain = Math.max(0, tend - System.currentTimeMillis()); + assertTrue(host.awaitActive(tremain)); + } + } + } + + /** + * Simulates a single "host". + */ + private static class Host { + + private final PoolingFeature feature = new PoolingFeature(); + + /** + * {@code True} if this host has processed a message, {@code false} otherwise. + */ + private final AtomicBoolean sawMsg = new AtomicBoolean(false); + + private final TopicSource externalSource; + + // mock objects + private final PolicyEngine engine = mock(PolicyEngine.class); + private final ListenerController controller = mock(ListenerController.class); + private final DroolsController drools = mock(DroolsController.class); + + /** + * + * @param context + */ + public Host(Context context) { + + when(controller.getName()).thenReturn(CONTROLLER1); + when(controller.getDrools()).thenReturn(drools); + + Properties props = makeSourceProperties(EXTERNAL_TOPIC); + externalSource = TopicEndpoint.manager.addTopicSources(props).get(0); + + // stop consuming events if the controller stops + when(controller.stop()).thenAnswer(args -> { + externalSource.unregister(controller); + return true; + }); + + doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any()); + + context.addController(controller, drools); + } + + /** + * Waits, for a period of time, for the host to enter the Active state. + * + * @param timeMs time to wait, in milliseconds + * @return {@code true} if the host entered the Active state within the given + * amount of time, {@code false} otherwise + * @throws InterruptedException + */ + public boolean awaitActive(long timeMs) throws InterruptedException { + return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS); + } + + /** + * Starts threads for the host so that it begins consuming from both the external + * "DMaaP" topic and its own internal "DMaaP" topic. + */ + public void start() { + feature.beforeStart(engine); + feature.afterCreate(controller); + + feature.beforeStart(controller); + + // start consuming events from the external topic + externalSource.register(controller); + + feature.afterStart(controller); + } + + /** + * Stops the host's threads. + */ + public void stop() { + feature.beforeStop(controller); + externalSource.unregister(controller); + feature.afterStop(controller); + } + + /** + * Offers an event to the feature, before the policy controller handles it. + * + * @param protocol + * @param topic2 + * @param event + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { + return feature.beforeOffer(controller, protocol, topic2, event); + } + + /** + * Offers an event to the feature, after the policy controller handles it. + * + * @param protocol + * @param topic + * @param event + * @param success + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) { + + return feature.afterOffer(controller, protocol, topic, event, success); + } + + /** + * Offers an event to the feature, before the drools controller handles it. + * + * @param fact + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean beforeInsert(Object fact) { + return feature.beforeInsert(drools, fact); + } + + /** + * Offers an event to the feature, after the drools controller handles it. + * + * @param fact + * @param successInsert {@code true} if it was successfully inserted by the drools + * controller, {@code false} otherwise + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean afterInsert(Object fact, boolean successInsert) { + return feature.afterInsert(drools, fact, successInsert); + } + + /** + * Indicates that a message was seen for this host. + */ + public void sawMessage() { + sawMsg.set(true); + } + + /** + * + * @return {@code true} if a message was seen for this host, {@code false} + * otherwise + */ + public boolean messageSeen() { + return sawMsg.get(); + } + } + + /** + * Listener for the external topic. Simulates the actions taken by + * <i>AggregatedPolicyController.onTopicEvent</i>. + */ + private static class MyExternalTopicListener implements Answer<Void> { + + private final Context context; + private final Host host; + + public MyExternalTopicListener(Context context, Host host) { + this.context = context; + this.host = host; + } + + @Override + public Void answer(InvocationOnMock args) throws Throwable { + int i = 0; + CommInfrastructure commType = args.getArgument(i++); + String topic = args.getArgument(i++); + String event = args.getArgument(i++); + + if (host.beforeOffer(commType, topic, event)) { + return null; + } + + boolean result; + Object fact = context.decodeEvent(event); + + if (fact == null) { + result = false; + context.bumpDecodeErrors(); + + } else { + result = true; + + if (!host.beforeInsert(fact)) { + // feature did not handle it so we handle it here + host.afterInsert(fact, result); + + host.sawMessage(); + context.addEvent(); + } + } + + host.afterOffer(commType, topic, event, result); + return null; + } + } + + /** + * Simulator for the feature-level factory. + */ + private static class FeatureFactory extends PoolingFeature.Factory { + + private final Context context; + + /** + * + * @param context + */ + public FeatureFactory(Context context) { + this.context = context; + + /* + * Note: do NOT extract anything from "context" at this point, because it + * hasn't been fully initialized yet + */ + } + + @Override + public Properties getProperties(String featName) { + Properties props = new Properties(); + + props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}"); + + props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true"); + props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC); + props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000"); + props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000"); + props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1), + "" + STD_OFFLINE_PUB_WAIT_MS); + props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1), + "" + STD_START_HEARTBEAT_MS); + props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS); + props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS); + props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1), + "" + STD_ACTIVE_HEARTBEAT_MS); + props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), + "" + STD_INTER_HEARTBEAT_MS); + + props.putAll(makeSinkProperties(INTERNAL_TOPIC)); + props.putAll(makeSourceProperties(INTERNAL_TOPIC)); + + return props; + } + + @Override + public PolicyController getController(DroolsController droolsController) { + return context.getController(droolsController); + } + } + + /** + * Simulator for the pooling manager factory. + */ + private static class ManagerFactory extends PoolingManagerImpl.Factory { + + /** + * Used to decode events from the external topic. + */ + private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() { + @Override + protected ObjectMapper initialValue() { + return new ObjectMapper(); + } + }; + + /** + * Used to decode events into a Map. + */ + private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {}; + + /** + * + * @param context + */ + public ManagerFactory(Context context) { + + /* + * Note: do NOT extract anything from "context" at this point, because it + * hasn't been fully initialized yet + */ + } + + @Override + public boolean canDecodeEvent(DroolsController drools, String topic) { + return true; + } + + @Override + public Object decodeEvent(DroolsController drools, String topic, String event) { + try { + return mapper.get().readValue(event, typeRef); + + } catch (IOException e) { + logger.warn("cannot decode external event", e); + return null; + } + } + } + + /** + * Controller that also implements the {@link TopicListener} interface. + */ + private static interface ListenerController extends PolicyController, TopicListener { + + } +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java index 7782e475..f8f37559 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java @@ -22,6 +22,7 @@ package org.onap.policy.drools.pooling; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; @@ -122,8 +123,8 @@ public class PoolingFeatureTest { when(factory.getController(drools2)).thenReturn(controller2); when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled); - when(factory.makeManager(any(), any())).thenAnswer(args -> { - PoolingProperties props = args.getArgument(1); + when(factory.makeManager(any(), any(), any(), any())).thenAnswer(args -> { + PoolingProperties props = args.getArgument(2); PoolingManagerImpl mgr = mock(PoolingManagerImpl.class); @@ -149,6 +150,19 @@ public class PoolingFeatureTest { } @Test + public void testGetHost() { + String host = pool.getHost(); + assertNotNull(host); + + // create another and ensure it generates another host name + pool = new PoolingFeature(); + String host2 = pool.getHost(); + assertNotNull(host2); + + assertTrue(!host.equals(host2)); + } + + @Test public void testGetSequenceNumber() { assertEquals(0, pool.getSequenceNumber()); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index e32fa545..e0024b79 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -73,6 +74,7 @@ public class PoolingManagerImplTest { protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1; protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1; + private static final String MY_HOST = "my.host"; private static final String HOST2 = "other.host"; private static final String MY_CONTROLLER = "my.controller"; @@ -111,6 +113,7 @@ public class PoolingManagerImplTest { private DroolsController drools; private Serializer ser; private Factory factory; + private CountDownLatch active; private PoolingManagerImpl mgr; @@ -140,6 +143,7 @@ public class PoolingManagerImplTest { futures = new LinkedList<>(); ser = new Serializer(); + active = new CountDownLatch(1); factory = mock(Factory.class); eventQueue = mock(EventQueue.class); @@ -151,7 +155,7 @@ public class PoolingManagerImplTest { when(factory.makeEventQueue(any())).thenReturn(eventQueue); when(factory.makeClassExtractors(any())).thenReturn(extractors); - when(factory.makeDmaapManager(any())).thenReturn(dmaap); + when(factory.makeDmaapManager(any(), any())).thenReturn(dmaap); when(factory.makeScheduler()).thenReturn(sched); when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true); when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT); @@ -179,12 +183,12 @@ public class PoolingManagerImplTest { PoolingManagerImpl.setFactory(factory); - mgr = new PoolingManagerImpl(controller, poolProps); + mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps, active); } @Test - public void testPoolingManagerImpl() { - mgr = new PoolingManagerImpl(controller, poolProps); + public void testPoolingManagerImpl() throws Exception { + verify(factory).makeDmaapManager(any(), any()); State st = mgr.getCurrent(); assertTrue(st instanceof IdleState); @@ -202,7 +206,7 @@ public class PoolingManagerImplTest { PolicyController ctlr = mock(PolicyController.class); PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class, - xxx -> new PoolingManagerImpl(ctlr, poolProps)); + () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps, active)); assertNotNull(ex.getCause()); assertTrue(ex.getCause() instanceof ClassCastException); } @@ -211,23 +215,28 @@ public class PoolingManagerImplTest { public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException { // throw an exception when we try to create the dmaap manager PoolingFeatureException ex = new PoolingFeatureException(); - when(factory.makeDmaapManager(any())).thenThrow(ex); + when(factory.makeDmaapManager(any(), any())).thenThrow(ex); PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class, - xxx -> new PoolingManagerImpl(controller, poolProps)); + () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active)); assertEquals(ex, ex2.getCause()); } @Test - public void testGetHost() { - String host = mgr.getHost(); - assertNotNull(host); + public void testGetCurrent() throws Exception { + assertEquals(IdleState.class, mgr.getCurrent().getClass()); + + startMgr(); + + assertEquals(StartState.class, mgr.getCurrent().getClass()); + } - // create another manager and ensure it generates a different host - mgr = new PoolingManagerImpl(controller, poolProps); + @Test + public void testGetHost() { + assertEquals(MY_HOST, mgr.getHost()); - assertNotNull(mgr.getHost()); - assertFalse(host.equals(mgr.getHost())); + mgr = new PoolingManagerImpl(HOST2, controller, poolProps, active); + assertEquals(HOST2, mgr.getHost()); } @Test @@ -268,7 +277,7 @@ public class PoolingManagerImplTest { PoolingFeatureException ex = new PoolingFeatureException(); doThrow(ex).when(dmaap).startPublisher(); - PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, xxx -> mgr.beforeStart()); + PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, () -> mgr.beforeStart()); assertEquals(ex, ex2); // should never start the scheduler @@ -453,8 +462,9 @@ public class PoolingManagerImplTest { // should have set the new filter verify(dmaap, times(++ntimes)).setFilter(any()); - // should have cancelled the timer - assertEquals(1, futures.size()); + // should have cancelled the timers + assertEquals(2, futures.size()); + verify(futures.poll()).cancel(false); verify(futures.poll()).cancel(false); /* @@ -465,8 +475,9 @@ public class PoolingManagerImplTest { // should have set the new filter verify(dmaap, times(++ntimes)).setFilter(any()); - // timer should still be active - assertEquals(1, futures.size()); + // new timers should now be active + assertEquals(2, futures.size()); + verify(futures.poll(), never()).cancel(false); verify(futures.poll(), never()).cancel(false); } @@ -548,7 +559,7 @@ public class PoolingManagerImplTest { ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class); - verify(sched).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(), + verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(), unitCap.capture()); assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue()); @@ -987,7 +998,7 @@ public class PoolingManagerImplTest { // route the message to this host mgr.startDistributing(makeAssignments(true)); - + // generate RuntimeException when onTopicEvent() is invoked doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any()); @@ -1056,21 +1067,27 @@ public class PoolingManagerImplTest { startMgr(); // route the message to this host - mgr.startDistributing(makeAssignments(true)); - + assertNotNull(mgr.startDistributing(makeAssignments(true))); assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + verify(eventQueue, never()).add(any()); - // null assignments should be ignored - mgr.startDistributing(null); + // null assignments should cause message to be queued + assertNull(mgr.startDistributing(null)); + assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + verify(eventQueue).add(any()); + + // route the message to this host + assertNotNull(mgr.startDistributing(makeAssignments(true))); assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + verify(eventQueue).add(any()); // route the message to the other host - mgr.startDistributing(makeAssignments(false)); - + assertNotNull(mgr.startDistributing(makeAssignments(false))); assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + verify(eventQueue).add(any()); } @Test @@ -1086,7 +1103,8 @@ public class PoolingManagerImplTest { when(eventQueue.poll()).thenAnswer(args -> lst.poll()); // route the messages to this host - assertTrue(mgr.startDistributing(makeAssignments(true)).await(2, TimeUnit.SECONDS)); + CountDownLatch latch = mgr.startDistributing(makeAssignments(true)); + assertTrue(latch.await(2, TimeUnit.SECONDS)); // all of the events should have been processed locally verify(dmaap, times(START_PUB)).publish(any()); @@ -1106,7 +1124,8 @@ public class PoolingManagerImplTest { when(eventQueue.poll()).thenAnswer(args -> lst.poll()); // route the messages to the OTHER host - assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS)); + CountDownLatch latch = mgr.startDistributing(makeAssignments(false)); + assertTrue(latch.await(2, TimeUnit.SECONDS)); // all of the events should have been forwarded verify(dmaap, times(4)).publish(any()); @@ -1142,6 +1161,7 @@ public class PoolingManagerImplTest { assertTrue(st instanceof ActiveState); assertEquals(mgr.getHost(), st.getHost()); assertEquals(asgn, mgr.getAssignments()); + assertEquals(0, active.getCount()); } @Test @@ -1149,6 +1169,7 @@ public class PoolingManagerImplTest { State st = mgr.goInactive(); assertTrue(st instanceof InactiveState); assertEquals(mgr.getHost(), st.getHost()); + assertEquals(1, active.getCount()); } @Test @@ -1330,7 +1351,7 @@ public class PoolingManagerImplTest { */ private <T extends Exception> T expectException(Class<T> exClass, ExFunction<T> func) { try { - func.apply(null); + func.apply(); throw new AssertionError("missing exception"); } catch (Exception e) { @@ -1349,10 +1370,9 @@ public class PoolingManagerImplTest { /** * Invokes the function. * - * @param arg always {@code null} * @throws T if an error occurs */ - public void apply(Void arg) throws T; + public void apply() throws T; } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java index 2d734c1c..459c770a 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java @@ -48,13 +48,13 @@ public class PoolingPropertiesTest { public static final boolean STD_FEATURE_ENABLED = true; public static final int STD_OFFLINE_LIMIT = 10; public static final long STD_OFFLINE_AGE_MS = 1000L; - public static final long STD_START_HEARTBEAT_MS = 2000L; - public static final long STD_REACTIVATE_MS = 3000L; - public static final long STD_IDENTIFICATION_MS = 4000L; - public static final long STD_LEADER_MS = 5000L; - public static final long STD_ACTIVE_HEARTBEAT_MS = 6000L; - public static final long STD_INTER_HEARTBEAT_MS = 7000L; - public static final long STD_OFFLINE_PUB_WAIT_MS = 8000L; + public static final long STD_OFFLINE_PUB_WAIT_MS = 2000L; + public static final long STD_START_HEARTBEAT_MS = 3000L; + public static final long STD_REACTIVATE_MS = 4000L; + public static final long STD_IDENTIFICATION_MS = 5000L; + public static final long STD_LEADER_MS = 6000L; + public static final long STD_ACTIVE_HEARTBEAT_MS = 7000L; + public static final long STD_INTER_HEARTBEAT_MS = 8000L; private Properties plain; private PoolingProperties pooling; @@ -99,8 +99,13 @@ public class PoolingPropertiesTest { } @Test + public void testGetOfflinePubWaitMs() throws PropertyException { + doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs()); + } + + @Test public void testGetStartHeartbeatMs() throws PropertyException { - doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 50000L, xxx -> pooling.getStartHeartbeatMs()); + doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 100000L, xxx -> pooling.getStartHeartbeatMs()); } @Test @@ -123,11 +128,6 @@ public class PoolingPropertiesTest { doTest(INTER_HEARTBEAT_MS, STD_INTER_HEARTBEAT_MS, 15000L, xxx -> pooling.getInterHeartbeatMs()); } - @Test - public void testGetOfflinePubWaitMs() throws PropertyException { - doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs()); - } - /** * Tests a particular property. Verifies that the correct value is returned if the * specialized property has a value or the property has no value. Also verifies that @@ -174,12 +174,12 @@ public class PoolingPropertiesTest { props.setProperty(specialize(FEATURE_ENABLED, CONTROLLER), "" + STD_FEATURE_ENABLED); props.setProperty(specialize(OFFLINE_LIMIT, CONTROLLER), "" + STD_OFFLINE_LIMIT); props.setProperty(specialize(OFFLINE_AGE_MS, CONTROLLER), "" + STD_OFFLINE_AGE_MS); + props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS); props.setProperty(specialize(START_HEARTBEAT_MS, CONTROLLER), "" + STD_START_HEARTBEAT_MS); props.setProperty(specialize(REACTIVATE_MS, CONTROLLER), "" + STD_REACTIVATE_MS); props.setProperty(specialize(IDENTIFICATION_MS, CONTROLLER), "" + STD_IDENTIFICATION_MS); props.setProperty(specialize(ACTIVE_HEARTBEAT_MS, CONTROLLER), "" + STD_ACTIVE_HEARTBEAT_MS); props.setProperty(specialize(INTER_HEARTBEAT_MS, CONTROLLER), "" + STD_INTER_HEARTBEAT_MS); - props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS); return props; } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java index 505dc400..d09650db 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java @@ -123,6 +123,24 @@ public class SpecPropertiesTest { } @Test + public void testSpecPropertiesStringStringProperties_EmptyPrefix() { + supportingProps = new Properties(); + + supportingProps.setProperty(PROP_NO_PREFIX, VAL_NO_PREFIX); + supportingProps.setProperty("a.value", VAL_GEN); + supportingProps.setProperty("b.value", VAL_GEN); + supportingProps.setProperty(MY_SPEC + ".b.value", VAL_SPEC); + + // no supporting properties + props = new SpecProperties("", MY_SPEC, supportingProps); + + assertEquals(VAL_NO_PREFIX, props.getProperty(gen(PROP_NO_PREFIX))); + assertEquals(VAL_GEN, props.getProperty(gen("a.value"))); + assertEquals(VAL_SPEC, props.getProperty(MY_SPEC + ".b.value")); + assertNull(props.getProperty(gen(PROP_UNKNOWN))); + } + + @Test public void testWithTrailingDot() { // neither has trailing dot assertEquals(PREFIX_GEN, props.getPrefix()); @@ -132,6 +150,16 @@ public class SpecPropertiesTest { props = new SpecProperties(PREFIX_GEN, MY_SPEC + "."); assertEquals(PREFIX_GEN, props.getPrefix()); assertEquals(PREFIX_SPEC, props.getSpecPrefix()); + + // first is empty + props = new SpecProperties("", MY_SPEC); + assertEquals("", props.getPrefix()); + assertEquals(MY_SPEC + ".", props.getSpecPrefix()); + + // second is empty + props = new SpecProperties(PREFIX_GEN, ""); + assertEquals(PREFIX_GEN, props.getPrefix()); + assertEquals(PREFIX_GEN, props.getSpecPrefix()); } @Test diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java index 7b4b0602..27284dcd 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java @@ -298,18 +298,18 @@ public class ActiveStateTest extends BasicStateTester { // heart beat generator timer = repeatedTasks.remove(); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue()); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); // my heart beat checker timer = repeatedTasks.remove(); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue()); // predecessor's heart beat checker timer = repeatedTasks.remove(); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue()); } @Test @@ -327,13 +327,13 @@ public class ActiveStateTest extends BasicStateTester { // heart beat generator timer = repeatedTasks.remove(); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue()); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); // my heart beat checker timer = repeatedTasks.remove(); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue()); } @Test @@ -389,13 +389,13 @@ public class ActiveStateTest extends BasicStateTester { // set up next state State next = mock(State.class); - when(mgr.goInactive()).thenReturn(next); + when(mgr.goStart()).thenReturn(next); // fire the task - should transition assertEquals(next, task.third().fire()); - // should indicate failure - verify(mgr).internalTopicFailed(); + // should stop distributing + verify(mgr).startDistributing(null); // should publish an offline message Offline msg = captureAdminMessage(Offline.class); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java index 394adaee..ae53ce05 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java @@ -22,12 +22,19 @@ package org.onap.policy.drools.pooling.state; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Map; import org.junit.Before; import org.junit.Test; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Query; import org.onap.policy.drools.utils.Pair; public class InactiveStateTest extends BasicStateTester { @@ -53,6 +60,42 @@ public class InactiveStateTest extends BasicStateTester { } @Test + public void testProcessLeader() { + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); + + String[] arr = {PREV_HOST, MY_HOST, HOST1}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(PREV_HOST, asgn); + + assertEquals(next, state.process(msg)); + verify(mgr).startDistributing(asgn); + } + + @Test + public void testProcessLeader_Invalid() { + Leader msg = new Leader(PREV_HOST, null); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + } + + @Test + public void testProcessQuery() { + State next = mock(State.class); + when(mgr.goQuery()).thenReturn(next); + + assertEquals(next, state.process(new Query())); + + Identification ident = captureAdminMessage(Identification.class); + assertEquals(MY_HOST, ident.getSource()); + assertEquals(ASGN3, ident.getAssignments()); + } + + @Test public void testGoInatcive() { assertNull(state.goInactive()); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java index e1718418..7ac58439 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java @@ -65,39 +65,6 @@ public class ProcessingStateTest extends BasicStateTester { } @Test - public void testGoActive_WithAssignment() { - State act = mock(State.class); - State inact = mock(State.class); - - when(mgr.goActive()).thenReturn(act); - when(mgr.goInactive()).thenReturn(inact); - - String[] arr = {HOST2, PREV_HOST, MY_HOST}; - BucketAssignments asgn = new BucketAssignments(arr); - - assertEquals(act, state.goActive(asgn)); - - verify(mgr).startDistributing(asgn); - } - - @Test - public void testGoActive_WithoutAssignment() { - State act = mock(State.class); - State inact = mock(State.class); - - when(mgr.goActive()).thenReturn(act); - when(mgr.goInactive()).thenReturn(inact); - - String[] arr = {HOST2, PREV_HOST}; - BucketAssignments asgn = new BucketAssignments(arr); - - assertEquals(inact, state.goActive(asgn)); - - verify(mgr).startDistributing(asgn); - - } - - @Test public void testProcessQuery() { State next = mock(State.class); when(mgr.goQuery()).thenReturn(next); @@ -155,13 +122,6 @@ public class ProcessingStateTest extends BasicStateTester { } @Test - public void testMakeIdentification() { - Identification ident = state.makeIdentification(); - assertEquals(MY_HOST, ident.getSource()); - assertEquals(ASGN3, ident.getAssignments()); - } - - @Test public void testGetAssignments() { // assignments from constructor assertEquals(ASGN3, state.getAssignments()); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java index a7c3a3d5..80778ed4 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java @@ -21,7 +21,6 @@ package org.onap.policy.drools.pooling.state; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -254,10 +253,13 @@ public class QueryStateTest extends BasicStateTester { // should published an Offline message and go inactive State next = mock(State.class); - when(mgr.goInactive()).thenReturn(next); + when(mgr.goStart()).thenReturn(next); assertEquals(next, timer.second().fire()); + // should stop distributing + verify(mgr).startDistributing(null); + Offline msg = captureAdminMessage(Offline.class); assertEquals(MY_HOST, msg.getSource()); } @@ -343,21 +345,6 @@ public class QueryStateTest extends BasicStateTester { } @Test - public void testHasAssignment() { - // null assignment - mgr.startDistributing(null); - assertFalse(state.hasAssignment()); - - // not in assignments - state.setAssignments(new BucketAssignments(new String[] {HOST3})); - assertFalse(state.hasAssignment()); - - // it IS in the assignments - state.setAssignments(new BucketAssignments(new String[] {MY_HOST})); - assertTrue(state.hasAssignment()); - } - - @Test public void testRecordInfo_NullSource() { state.setAssignments(ASGN3); state.setLeader(MY_HOST); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java index af4e8f13..ee4c1add 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Map; @@ -39,6 +40,7 @@ import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; import org.onap.policy.drools.utils.Pair; +import org.onap.policy.drools.utils.Triple; public class StartStateTest extends BasicStateTester { @@ -78,15 +80,36 @@ public class StartStateTest extends BasicStateTester { assertEquals(MY_HOST, msg.first()); assertEquals(state.getHbTimestampMs(), msg.second().getTimestampMs()); - Pair<Long, StateTimerTask> timer = onceTasks.removeFirst(); - assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first().longValue()); + /* + * Verify heartbeat generator + */ + Triple<Long, Long, StateTimerTask> generator = repeatedTasks.removeFirst(); + + assertEquals(STD_INTER_HEARTBEAT_MS, generator.first().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, generator.second().longValue()); + + // invoke the task - it should generate another heartbeat + assertEquals(null, generator.third().fire()); + verify(mgr, times(2)).publish(MY_HOST, msg.second()); + + // and again + assertEquals(null, generator.third().fire()); + verify(mgr, times(3)).publish(MY_HOST, msg.second()); + + + /* + * Verify heartbeat checker + */ + Pair<Long, StateTimerTask> checker = onceTasks.removeFirst(); + + assertEquals(STD_HEARTBEAT_WAIT_MS, checker.first().longValue()); // invoke the task - it should go to the state returned by the mgr State next = mock(State.class); when(mgr.goInactive()).thenReturn(next); - assertEquals(next, timer.second().fire()); + assertEquals(next, checker.second().fire()); verify(mgr).internalTopicFailed(); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java index a184dfad..47624aa0 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java @@ -154,12 +154,48 @@ public class StateTest extends BasicStateTester { } @Test - public void testGoActive() { - State next = mock(State.class); - when(mgr.goActive()).thenReturn(next); + public void testGoActive_WithAssignment() { + State act = mock(State.class); + State inact = mock(State.class); - State next2 = state.goActive(); - assertEquals(next, next2); + when(mgr.goActive()).thenReturn(act); + when(mgr.goInactive()).thenReturn(inact); + + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + + assertEquals(act, state.goActive(asgn)); + + verify(mgr).startDistributing(asgn); + } + + @Test + public void testGoActive_WithoutAssignment() { + State act = mock(State.class); + State inact = mock(State.class); + + when(mgr.goActive()).thenReturn(act); + when(mgr.goInactive()).thenReturn(inact); + + String[] arr = {HOST2, PREV_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + + assertEquals(inact, state.goActive(asgn)); + + verify(mgr).startDistributing(asgn); + } + + @Test + public void testGoActive_NullAssignment() { + State act = mock(State.class); + State inact = mock(State.class); + + when(mgr.goActive()).thenReturn(act); + when(mgr.goInactive()).thenReturn(inact); + + assertEquals(inact, state.goActive(null)); + + verify(mgr, never()).startDistributing(any()); } @Test @@ -375,6 +411,20 @@ public class StateTest extends BasicStateTester { } @Test + public void testMissedHeartbeat() { + State next = mock(State.class); + when(mgr.goStart()).thenReturn(next); + + State next2 = state.missedHeartbeat(); + assertEquals(next, next2); + + verify(mgr).startDistributing(null); + + Offline msg = captureAdminMessage(Offline.class); + assertEquals(MY_HOST, msg.getSource()); + } + + @Test public void testInternalTopicFailed() { State next = mock(State.class); when(mgr.goInactive()).thenReturn(next); @@ -398,6 +448,13 @@ public class StateTest extends BasicStateTester { } @Test + public void testMakeIdentification() { + Identification ident = state.makeIdentification(); + assertEquals(MY_HOST, ident.getSource()); + assertEquals(ASGN3, ident.getAssignments()); + } + + @Test public void testMakeOffline() { Offline msg = state.makeOffline(); |