diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java')
12 files changed, 165 insertions, 129 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index 102eda75..abb4da33 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -202,6 +202,7 @@ public class DmaapManager { } catch (InterruptedException e) { logger.warn("message transmission stopped due to {}", e.getMessage()); + Thread.currentThread().interrupt(); } try { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index 21cbc4db..2bec4579 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -21,6 +21,7 @@ package org.onap.policy.drools.pooling; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; @@ -54,6 +55,11 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF private static Factory factory; /** + * ID of this host. + */ + private final String host; + + /** * Entire set of feature properties, including those specific to various controllers. */ private Properties featProps = null; @@ -75,6 +81,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF */ public PoolingFeature() { super(); + + this.host = UUID.randomUUID().toString(); } protected static Factory getFactory() { @@ -90,6 +98,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF PoolingFeature.factory = factory; } + public String getHost() { + return host; + } + @Override public int getSequenceNumber() { return 0; @@ -123,7 +135,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF PoolingProperties props = new PoolingProperties(name, featProps); logger.info("pooling enabled for {}", name); - ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props)); + ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props)); } catch (PropertyException e) { logger.error("pooling disabled due to exception for {}", name, e); @@ -371,12 +383,13 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Makes a pooling manager for a controller. * + * @param host name/uuid of this host * @param controller * @param props properties to use to configure the manager * @return a new pooling manager */ - public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) { - return new PoolingManagerImpl(controller, props); + public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props) { + return new PoolingManagerImpl(host, controller, props); } /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index 422efdd7..d2312469 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -25,12 +25,10 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; import java.util.Properties; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.event.comm.TopicListener; @@ -47,6 +45,7 @@ import org.onap.policy.drools.pooling.state.QueryState; import org.onap.policy.drools.pooling.state.StartState; import org.onap.policy.drools.pooling.state.State; import org.onap.policy.drools.pooling.state.StateTimerTask; +import org.onap.policy.drools.properties.PolicyProperties; import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.system.PolicyController; import org.slf4j.Logger; @@ -75,11 +74,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private static Factory factory = new Factory(); /** - * ID of the last host that was created. - */ - private static final AtomicReference<String> lastHost = new AtomicReference<>(null); - - /** * ID of this host. */ private final String host; @@ -158,28 +152,24 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Constructs the manager, initializing all of the data structures. - * + * + * @param host name/uuid of this host * @param controller controller with which this is associated * @param props feature properties specific to the controller */ - public PoolingManagerImpl(PolicyController controller, PoolingProperties props) { - this.host = UUID.randomUUID().toString(); + public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) { + this.host = host; this.controller = controller; this.props = props; - lastHost.set(this.host); - try { this.listener = (TopicListener) controller; this.serializer = new Serializer(); this.topic = props.getPoolingTopic(); this.eventq = factory.makeEventQueue(props); - - SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), - props.getSource()); - this.extractors = factory.makeClassExtractors(spec); - - this.dmaapMgr = factory.makeDmaapManager(props); + this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource())); + this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(), + makeDmaapProps(controller, props.getSource())); this.current = new IdleState(this); logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic); @@ -203,16 +193,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Used by junit tests. - * - * @return the ID of the last host that was created, or {@code null} if no hosts have - * been created yet - */ - protected static String getLastHost() { - return lastHost.get(); - } - - /** * Should only be used by junit tests. * * @return the current state @@ -237,6 +217,50 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** + * Makes properties for configuring extractors. + * + * @param controller the controller for which the extractors will be configured + * @param source properties from which to get the extractor properties + * @return extractor properties + */ + private Properties makeExtractorProps(PolicyController controller, Properties source) { + return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source); + } + + /** + * Makes properties for configuring DMaaP. Copies properties from the source that + * start with the Pooling property prefix followed by the controller name, stripping + * the prefix and controller name. + * + * @param controller the controller for which DMaaP will be configured + * @param source properties from which to get the DMaaP properties + * @return DMaaP properties + */ + private Properties makeDmaapProps(PolicyController controller, Properties source) { + SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source); + + // could be UEB or DMAAP, so add both + addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); + addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); + + return props; + } + + /** + * Adds DMaaP consumer properties, consumer group & instance. The group is the host + * and the instance is a constant. + * + * @param props where to add the new properties + * @param prefix property prefix + */ + private void addDmaapConsumerProps(SpecProperties props, String prefix) { + String fullpfx = props.getSpecPrefix() + prefix + "." + topic; + + props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host); + props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0"); + } + + /** * Indicates that the controller is about to start. Starts the publisher for the * internal topic, and creates a thread pool for the timers. * @@ -288,8 +312,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { scheduler = null; if (!(current instanceof IdleState)) { - dmaapMgr.stopConsumer(this); changeState(new IdleState(this)); + dmaapMgr.stopConsumer(this); publishAdmin(new Offline(getHost())); } } @@ -751,26 +775,27 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { @Override public CountDownLatch startDistributing(BucketAssignments asgn) { - if (asgn == null) { - return null; - } - - logger.info("new assignments for topic {}", getTopic()); - synchronized (curLocker) { + int sz = (asgn == null ? 0 : asgn.getAllHosts().size()); + logger.info("new assignments for {} hosts on topic {}", sz, getTopic()); assignments = asgn; } + if (asgn == null) { + return null; + } + /* * publish the events from the event queue, but do it in a background thread so * that the state machine can enter its correct state BEFORE we start processing * the events */ CountDownLatch latch = new CountDownLatch(1); - + new Thread(() -> { synchronized (curLocker) { if (assignments == null) { + latch.countDown(); return; } @@ -779,11 +804,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { while ((ev = eventq.poll()) != null) { handle(ev); } - + latch.countDown(); } }).start(); - + return latch; } @@ -876,12 +901,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Creates a DMaaP manager. * - * @param props properties used to configure the manager + * @param topic name of the internal DMaaP topic + * @param props properties used to configure DMaaP * @return a new DMaaP manager * @throws PoolingFeatureException if an error occurs */ - public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException { - return new DmaapManager(props.getPoolingTopic(), props.getSource()); + public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException { + return new DmaapManager(topic, props); } /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java index 4e8de0d0..54319423 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java @@ -51,12 +51,12 @@ public class PoolingProperties extends SpecPropertyConfiguration { public static final String FEATURE_ENABLED = PREFIX + "{?.}enabled"; public static final String OFFLINE_LIMIT = PREFIX + "{?.}offline.queue.limit"; public static final String OFFLINE_AGE_MS = PREFIX + "{?.}offline.queue.age.milliseconds"; + public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds"; public static final String START_HEARTBEAT_MS = PREFIX + "{?.}start.heartbeat.milliseconds"; public static final String REACTIVATE_MS = PREFIX + "{?.}reactivate.milliseconds"; public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds"; public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds"; public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds"; - public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds"; /** * Type of item that the extractors will be extracting. @@ -94,10 +94,17 @@ public class PoolingProperties extends SpecPropertyConfiguration { private long offlineAgeMs; /** + * Time, in milliseconds, to wait for an "Offline" message to be published + * to DMaaP. + */ + @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000") + private long offlinePubWaitMs; + + /** * Time, in milliseconds, to wait for this host's heart beat during the * start-up state. */ - @Property(name = START_HEARTBEAT_MS, defaultValue = "50000") + @Property(name = START_HEARTBEAT_MS, defaultValue = "100000") private long startHeartbeatMs; /** @@ -123,19 +130,12 @@ public class PoolingProperties extends SpecPropertyConfiguration { /** * Time, in milliseconds, to wait between heart beat generations during - * the active state. + * the active and start-up states. */ @Property(name = INTER_HEARTBEAT_MS, defaultValue = "15000") private long interHeartbeatMs; /** - * Time, in milliseconds, to wait for an "Offline" message to be published - * to DMaaP. - */ - @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000") - private long offlinePubWaitMs; - - /** * @param controllerName the name of the controller * @param props set of properties used to configure this * @throws PropertyException if an error occurs @@ -163,6 +163,10 @@ public class PoolingProperties extends SpecPropertyConfiguration { return offlineAgeMs; } + public long getOfflinePubWaitMs() { + return offlinePubWaitMs; + } + public long getStartHeartbeatMs() { return startHeartbeatMs; } @@ -182,8 +186,4 @@ public class PoolingProperties extends SpecPropertyConfiguration { public long getInterHeartbeatMs() { return interHeartbeatMs; } - - public long getOfflinePubWaitMs() { - return offlinePubWaitMs; - } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java index 31b4e614..b62ea0a7 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java @@ -40,7 +40,8 @@ public class SpecProperties extends Properties { /** * - * @param prefix the property name prefix that appears before any specialization + * @param prefix the property name prefix that appears before any specialization, may + * be "" * @param specialization the property name specialization (e.g., session name) */ public SpecProperties(String prefix, String specialization) { @@ -50,7 +51,8 @@ public class SpecProperties extends Properties { /** * - * @param prefix the property name prefix that appears before any specialization + * @param prefix the property name prefix that appears before any specialization, may + * be "" * @param specialization the property name specialization (e.g., session name) * @param props the default properties */ @@ -68,7 +70,7 @@ public class SpecProperties extends Properties { * @return the text, with a trailing "." */ private static String withTrailingDot(String text) { - return text.endsWith(".") ? text : text + "."; + return text.isEmpty() || text.endsWith(".") ? text : text + "."; } /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java index 2752ca8c..ccca77be 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java @@ -374,7 +374,7 @@ public class ClassExtractors { } catch (NoSuchMethodException expected) { // no getXxx() method, maybe there's a field by this name - logger.debug("no method {} in {}", nm, clazz.getName()); + logger.debug("no method {} in {}", nm, clazz.getName(), expected); return null; } catch (SecurityException e) { @@ -445,7 +445,7 @@ public class ClassExtractors { } catch (NoSuchFieldException expected) { // no field by this name - try super class & interfaces - logger.debug("no field {} in {}", name, clazz.getName()); + logger.debug("no field {} in {}", name, clazz.getName(), expected); } catch (SecurityException e) { throw new ExtractorException("inaccessible field " + clazz + "." + name, e); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java index b0a36cd9..8f0a902a 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java @@ -95,18 +95,19 @@ public class ActiveState extends ProcessingState { if ((succHost = assigned.higher(getHost())) == null) { // wrapped around - successor is the first host in the set succHost = assigned.first(); - logger.info("this host's successor is {} on topic {}", succHost, getTopic()); } + logger.info("this host's successor is {} on topic {}", succHost, getTopic()); if ((predHost = assigned.lower(getHost())) == null) { // wrapped around - predecessor is the last host in the set predHost = assigned.last(); - logger.info("this host's predecessor is {} on topic {}", predHost, getTopic()); } + logger.info("this host's predecessor is {} on topic {}", predHost, getTopic()); } @Override public void start() { + super.start(); addTimers(); genHeartbeat(); } @@ -120,7 +121,7 @@ public class ActiveState extends ProcessingState { /* * heart beat generator */ - long genMs = getProperties().getActiveHeartbeatMs(); + long genMs = getProperties().getInterHeartbeatMs(); scheduleWithFixedDelay(genMs, genMs, () -> { genHeartbeat(); @@ -130,9 +131,9 @@ public class ActiveState extends ProcessingState { /* * my heart beat checker */ - long interMs = getProperties().getInterHeartbeatMs(); + long waitMs = getProperties().getActiveHeartbeatMs(); - scheduleWithFixedDelay(interMs, interMs, () -> { + scheduleWithFixedDelay(waitMs, waitMs, () -> { if (myHeartbeatSeen) { myHeartbeatSeen = false; return null; @@ -141,7 +142,7 @@ public class ActiveState extends ProcessingState { // missed my heart beat logger.error("missed my heartbeat on topic {}", getTopic()); - return internalTopicFailed(); + return missedHeartbeat(); }); /* @@ -149,7 +150,7 @@ public class ActiveState extends ProcessingState { */ if (!predHost.isEmpty()) { - scheduleWithFixedDelay(interMs, interMs, () -> { + scheduleWithFixedDelay(waitMs, waitMs, () -> { if (predHeartbeatSeen) { predHeartbeatSeen = false; return null; diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java index 6be2fb84..da044252 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java @@ -44,24 +44,17 @@ public class InactiveState extends State { @Override public void start() { - super.start(); - schedule(getProperties().getReactivateMs(), () -> goStart()); } @Override public State process(Leader msg) { - if(isValid(msg)) { + if (isValid(msg)) { logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic()); - startDistributing(msg.getAssignments()); - - if(msg.getAssignments().hasAssignment(getHost())) { - logger.info("received Leader message on topic {}", getTopic()); - return goActive(); - } + return goActive(msg.getAssignments()); } - + return null; } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java index 1e9bb581..e9dc0324 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java @@ -74,24 +74,6 @@ public class ProcessingState extends State { } /** - * Goes active with a new set of assignments. - * - * @param asgn new assignments - * @return the new state, either Active or Inactive, depending on whether or not this - * host has an assignment - */ - protected State goActive(BucketAssignments asgn) { - startDistributing(asgn); - - if (asgn.hasAssignment(getHost())) { - return goActive(); - - } else { - return goInactive(); - } - } - - /** * Generates an Identification message and goes to the query state. */ @Override @@ -154,11 +136,11 @@ public class ProcessingState extends State { } Leader msg = makeLeader(alive); - publish(msg); + logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size()); - setAssignments(msg.getAssignments()); + publish(msg); - return goActive(); + return goActive(msg.getAssignments()); } /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java index 9045165b..1a4da150 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java @@ -63,7 +63,6 @@ public class QueryState extends ProcessingState { @Override public void start() { - super.start(); // start identification timer @@ -85,39 +84,21 @@ public class QueryState extends ProcessingState { if (!sawSelfIdent) { // didn't see our identification logger.error("missed our own Ident message on topic {}", getTopic()); - return internalTopicFailed(); + return missedHeartbeat(); } else if (isLeader()) { // "this" host is the new leader logger.info("this host is the new leader for topic {}", getTopic()); return becomeLeader(alive); - } else if (hasAssignment()) { - /* - * this host is not the new leader, but it does have an assignment - - * return to the active state while we wait for the leader - */ - logger.info("no new leader on topic {}", getTopic()); - return goActive(); - } else { - // not the leader and no assignment yet + // not the leader - return to previous state logger.info("no new leader on topic {}", getTopic()); - return goInactive(); + return goActive(getAssignments()); } }); } - /** - * Determines if this host has an assignment in the CURRENT assignments. - * - * @return {@code true} if this host has an assignment, {@code false} otherwise - */ - protected boolean hasAssignment() { - BucketAssignments asgn = getAssignments(); - return (asgn != null && asgn.hasAssignment(getHost())); - } - @Override public State goQuery() { return null; diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java index a978e245..3068cfc9 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java @@ -67,8 +67,22 @@ public class StartState extends State { super.start(); - publish(getHost(), makeHeartbeat(hbTimestampMs)); + Heartbeat hb = makeHeartbeat(hbTimestampMs); + publish(getHost(), hb); + /* + * heart beat generator + */ + long genMs = getProperties().getInterHeartbeatMs(); + + scheduleWithFixedDelay(genMs, genMs, () -> { + publish(getHost(), hb); + return null; + }); + + /* + * my heart beat checker + */ schedule(getProperties().getStartHeartbeatMs(), () -> { logger.error("missed heartbeat on topic {}", getTopic()); return internalTopicFailed(); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index 421b9225..54e93230 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -113,12 +113,21 @@ public abstract class State { } /** - * Transitions to the "active" state. + * Goes active with a new set of assignments. * - * @return the new state + * @param asgn new assignments + * @return the new state, either Active or Inactive, depending on whether or not this + * host has an assignment */ - public final State goActive() { - return mgr.goActive(); + protected State goActive(BucketAssignments asgn) { + startDistributing(asgn); + + if (asgn.hasAssignment(getHost())) { + return mgr.goActive(); + + } else { + return goInactive(); + } } /** @@ -322,7 +331,21 @@ public abstract class State { } /** - * Indicates that the internal topic failed. + * Indicates that we failed to see our own heartbeat; must be a problem with the + * internal topic. + * + * @return a new {@link StartState} + */ + protected final State missedHeartbeat() { + publish(makeOffline()); + mgr.startDistributing(null); + + return mgr.goStart(); + } + + /** + * Indicates that the internal topic failed; this should only be invoked from the + * StartState. * * @return a new {@link InactiveState} */ |