summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java1
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java19
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java108
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java28
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java8
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java4
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java15
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java13
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java24
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java25
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java16
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java33
12 files changed, 165 insertions, 129 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 102eda75..abb4da33 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -202,6 +202,7 @@ public class DmaapManager {
} catch (InterruptedException e) {
logger.warn("message transmission stopped due to {}", e.getMessage());
+ Thread.currentThread().interrupt();
}
try {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 21cbc4db..2bec4579 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -21,6 +21,7 @@
package org.onap.policy.drools.pooling;
import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.onap.policy.common.utils.properties.exception.PropertyException;
@@ -54,6 +55,11 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
private static Factory factory;
/**
+ * ID of this host.
+ */
+ private final String host;
+
+ /**
* Entire set of feature properties, including those specific to various controllers.
*/
private Properties featProps = null;
@@ -75,6 +81,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
*/
public PoolingFeature() {
super();
+
+ this.host = UUID.randomUUID().toString();
}
protected static Factory getFactory() {
@@ -90,6 +98,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
PoolingFeature.factory = factory;
}
+ public String getHost() {
+ return host;
+ }
+
@Override
public int getSequenceNumber() {
return 0;
@@ -123,7 +135,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
PoolingProperties props = new PoolingProperties(name, featProps);
logger.info("pooling enabled for {}", name);
- ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props));
+ ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props));
} catch (PropertyException e) {
logger.error("pooling disabled due to exception for {}", name, e);
@@ -371,12 +383,13 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
* Makes a pooling manager for a controller.
*
+ * @param host name/uuid of this host
* @param controller
* @param props properties to use to configure the manager
* @return a new pooling manager
*/
- public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) {
- return new PoolingManagerImpl(controller, props);
+ public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props) {
+ return new PoolingManagerImpl(host, controller, props);
}
/**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 422efdd7..d2312469 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -25,12 +25,10 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.event.comm.TopicListener;
@@ -47,6 +45,7 @@ import org.onap.policy.drools.pooling.state.QueryState;
import org.onap.policy.drools.pooling.state.StartState;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
+import org.onap.policy.drools.properties.PolicyProperties;
import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
import org.onap.policy.drools.system.PolicyController;
import org.slf4j.Logger;
@@ -75,11 +74,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private static Factory factory = new Factory();
/**
- * ID of the last host that was created.
- */
- private static final AtomicReference<String> lastHost = new AtomicReference<>(null);
-
- /**
* ID of this host.
*/
private final String host;
@@ -158,28 +152,24 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Constructs the manager, initializing all of the data structures.
- *
+ *
+ * @param host name/uuid of this host
* @param controller controller with which this is associated
* @param props feature properties specific to the controller
*/
- public PoolingManagerImpl(PolicyController controller, PoolingProperties props) {
- this.host = UUID.randomUUID().toString();
+ public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) {
+ this.host = host;
this.controller = controller;
this.props = props;
- lastHost.set(this.host);
-
try {
this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
this.eventq = factory.makeEventQueue(props);
-
- SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(),
- props.getSource());
- this.extractors = factory.makeClassExtractors(spec);
-
- this.dmaapMgr = factory.makeDmaapManager(props);
+ this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
+ this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(),
+ makeDmaapProps(controller, props.getSource()));
this.current = new IdleState(this);
logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
@@ -203,16 +193,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Used by junit tests.
- *
- * @return the ID of the last host that was created, or {@code null} if no hosts have
- * been created yet
- */
- protected static String getLastHost() {
- return lastHost.get();
- }
-
- /**
* Should only be used by junit tests.
*
* @return the current state
@@ -237,6 +217,50 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
+ * Makes properties for configuring extractors.
+ *
+ * @param controller the controller for which the extractors will be configured
+ * @param source properties from which to get the extractor properties
+ * @return extractor properties
+ */
+ private Properties makeExtractorProps(PolicyController controller, Properties source) {
+ return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
+ }
+
+ /**
+ * Makes properties for configuring DMaaP. Copies properties from the source that
+ * start with the Pooling property prefix followed by the controller name, stripping
+ * the prefix and controller name.
+ *
+ * @param controller the controller for which DMaaP will be configured
+ * @param source properties from which to get the DMaaP properties
+ * @return DMaaP properties
+ */
+ private Properties makeDmaapProps(PolicyController controller, Properties source) {
+ SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source);
+
+ // could be UEB or DMAAP, so add both
+ addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
+ addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+
+ return props;
+ }
+
+ /**
+ * Adds DMaaP consumer properties, consumer group & instance. The group is the host
+ * and the instance is a constant.
+ *
+ * @param props where to add the new properties
+ * @param prefix property prefix
+ */
+ private void addDmaapConsumerProps(SpecProperties props, String prefix) {
+ String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
+
+ props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
+ props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
+ }
+
+ /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*
@@ -288,8 +312,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
scheduler = null;
if (!(current instanceof IdleState)) {
- dmaapMgr.stopConsumer(this);
changeState(new IdleState(this));
+ dmaapMgr.stopConsumer(this);
publishAdmin(new Offline(getHost()));
}
}
@@ -751,26 +775,27 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
@Override
public CountDownLatch startDistributing(BucketAssignments asgn) {
- if (asgn == null) {
- return null;
- }
-
- logger.info("new assignments for topic {}", getTopic());
-
synchronized (curLocker) {
+ int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
+ logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
assignments = asgn;
}
+ if (asgn == null) {
+ return null;
+ }
+
/*
* publish the events from the event queue, but do it in a background thread so
* that the state machine can enter its correct state BEFORE we start processing
* the events
*/
CountDownLatch latch = new CountDownLatch(1);
-
+
new Thread(() -> {
synchronized (curLocker) {
if (assignments == null) {
+ latch.countDown();
return;
}
@@ -779,11 +804,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
while ((ev = eventq.poll()) != null) {
handle(ev);
}
-
+
latch.countDown();
}
}).start();
-
+
return latch;
}
@@ -876,12 +901,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Creates a DMaaP manager.
*
- * @param props properties used to configure the manager
+ * @param topic name of the internal DMaaP topic
+ * @param props properties used to configure DMaaP
* @return a new DMaaP manager
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException {
- return new DmaapManager(props.getPoolingTopic(), props.getSource());
+ public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException {
+ return new DmaapManager(topic, props);
}
/**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
index 4e8de0d0..54319423 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
@@ -51,12 +51,12 @@ public class PoolingProperties extends SpecPropertyConfiguration {
public static final String FEATURE_ENABLED = PREFIX + "{?.}enabled";
public static final String OFFLINE_LIMIT = PREFIX + "{?.}offline.queue.limit";
public static final String OFFLINE_AGE_MS = PREFIX + "{?.}offline.queue.age.milliseconds";
+ public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds";
public static final String START_HEARTBEAT_MS = PREFIX + "{?.}start.heartbeat.milliseconds";
public static final String REACTIVATE_MS = PREFIX + "{?.}reactivate.milliseconds";
public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds";
public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds";
public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds";
- public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds";
/**
* Type of item that the extractors will be extracting.
@@ -94,10 +94,17 @@ public class PoolingProperties extends SpecPropertyConfiguration {
private long offlineAgeMs;
/**
+ * Time, in milliseconds, to wait for an "Offline" message to be published
+ * to DMaaP.
+ */
+ @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
+ private long offlinePubWaitMs;
+
+ /**
* Time, in milliseconds, to wait for this host's heart beat during the
* start-up state.
*/
- @Property(name = START_HEARTBEAT_MS, defaultValue = "50000")
+ @Property(name = START_HEARTBEAT_MS, defaultValue = "100000")
private long startHeartbeatMs;
/**
@@ -123,19 +130,12 @@ public class PoolingProperties extends SpecPropertyConfiguration {
/**
* Time, in milliseconds, to wait between heart beat generations during
- * the active state.
+ * the active and start-up states.
*/
@Property(name = INTER_HEARTBEAT_MS, defaultValue = "15000")
private long interHeartbeatMs;
/**
- * Time, in milliseconds, to wait for an "Offline" message to be published
- * to DMaaP.
- */
- @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
- private long offlinePubWaitMs;
-
- /**
* @param controllerName the name of the controller
* @param props set of properties used to configure this
* @throws PropertyException if an error occurs
@@ -163,6 +163,10 @@ public class PoolingProperties extends SpecPropertyConfiguration {
return offlineAgeMs;
}
+ public long getOfflinePubWaitMs() {
+ return offlinePubWaitMs;
+ }
+
public long getStartHeartbeatMs() {
return startHeartbeatMs;
}
@@ -182,8 +186,4 @@ public class PoolingProperties extends SpecPropertyConfiguration {
public long getInterHeartbeatMs() {
return interHeartbeatMs;
}
-
- public long getOfflinePubWaitMs() {
- return offlinePubWaitMs;
- }
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
index 31b4e614..b62ea0a7 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
@@ -40,7 +40,8 @@ public class SpecProperties extends Properties {
/**
*
- * @param prefix the property name prefix that appears before any specialization
+ * @param prefix the property name prefix that appears before any specialization, may
+ * be ""
* @param specialization the property name specialization (e.g., session name)
*/
public SpecProperties(String prefix, String specialization) {
@@ -50,7 +51,8 @@ public class SpecProperties extends Properties {
/**
*
- * @param prefix the property name prefix that appears before any specialization
+ * @param prefix the property name prefix that appears before any specialization, may
+ * be ""
* @param specialization the property name specialization (e.g., session name)
* @param props the default properties
*/
@@ -68,7 +70,7 @@ public class SpecProperties extends Properties {
* @return the text, with a trailing "."
*/
private static String withTrailingDot(String text) {
- return text.endsWith(".") ? text : text + ".";
+ return text.isEmpty() || text.endsWith(".") ? text : text + ".";
}
/**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
index 2752ca8c..ccca77be 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
@@ -374,7 +374,7 @@ public class ClassExtractors {
} catch (NoSuchMethodException expected) {
// no getXxx() method, maybe there's a field by this name
- logger.debug("no method {} in {}", nm, clazz.getName());
+ logger.debug("no method {} in {}", nm, clazz.getName(), expected);
return null;
} catch (SecurityException e) {
@@ -445,7 +445,7 @@ public class ClassExtractors {
} catch (NoSuchFieldException expected) {
// no field by this name - try super class & interfaces
- logger.debug("no field {} in {}", name, clazz.getName());
+ logger.debug("no field {} in {}", name, clazz.getName(), expected);
} catch (SecurityException e) {
throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
index b0a36cd9..8f0a902a 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
@@ -95,18 +95,19 @@ public class ActiveState extends ProcessingState {
if ((succHost = assigned.higher(getHost())) == null) {
// wrapped around - successor is the first host in the set
succHost = assigned.first();
- logger.info("this host's successor is {} on topic {}", succHost, getTopic());
}
+ logger.info("this host's successor is {} on topic {}", succHost, getTopic());
if ((predHost = assigned.lower(getHost())) == null) {
// wrapped around - predecessor is the last host in the set
predHost = assigned.last();
- logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
}
+ logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
}
@Override
public void start() {
+ super.start();
addTimers();
genHeartbeat();
}
@@ -120,7 +121,7 @@ public class ActiveState extends ProcessingState {
/*
* heart beat generator
*/
- long genMs = getProperties().getActiveHeartbeatMs();
+ long genMs = getProperties().getInterHeartbeatMs();
scheduleWithFixedDelay(genMs, genMs, () -> {
genHeartbeat();
@@ -130,9 +131,9 @@ public class ActiveState extends ProcessingState {
/*
* my heart beat checker
*/
- long interMs = getProperties().getInterHeartbeatMs();
+ long waitMs = getProperties().getActiveHeartbeatMs();
- scheduleWithFixedDelay(interMs, interMs, () -> {
+ scheduleWithFixedDelay(waitMs, waitMs, () -> {
if (myHeartbeatSeen) {
myHeartbeatSeen = false;
return null;
@@ -141,7 +142,7 @@ public class ActiveState extends ProcessingState {
// missed my heart beat
logger.error("missed my heartbeat on topic {}", getTopic());
- return internalTopicFailed();
+ return missedHeartbeat();
});
/*
@@ -149,7 +150,7 @@ public class ActiveState extends ProcessingState {
*/
if (!predHost.isEmpty()) {
- scheduleWithFixedDelay(interMs, interMs, () -> {
+ scheduleWithFixedDelay(waitMs, waitMs, () -> {
if (predHeartbeatSeen) {
predHeartbeatSeen = false;
return null;
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
index 6be2fb84..da044252 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -44,24 +44,17 @@ public class InactiveState extends State {
@Override
public void start() {
-
super.start();
-
schedule(getProperties().getReactivateMs(), () -> goStart());
}
@Override
public State process(Leader msg) {
- if(isValid(msg)) {
+ if (isValid(msg)) {
logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic());
- startDistributing(msg.getAssignments());
-
- if(msg.getAssignments().hasAssignment(getHost())) {
- logger.info("received Leader message on topic {}", getTopic());
- return goActive();
- }
+ return goActive(msg.getAssignments());
}
-
+
return null;
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
index 1e9bb581..e9dc0324 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
@@ -74,24 +74,6 @@ public class ProcessingState extends State {
}
/**
- * Goes active with a new set of assignments.
- *
- * @param asgn new assignments
- * @return the new state, either Active or Inactive, depending on whether or not this
- * host has an assignment
- */
- protected State goActive(BucketAssignments asgn) {
- startDistributing(asgn);
-
- if (asgn.hasAssignment(getHost())) {
- return goActive();
-
- } else {
- return goInactive();
- }
- }
-
- /**
* Generates an Identification message and goes to the query state.
*/
@Override
@@ -154,11 +136,11 @@ public class ProcessingState extends State {
}
Leader msg = makeLeader(alive);
- publish(msg);
+ logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());
- setAssignments(msg.getAssignments());
+ publish(msg);
- return goActive();
+ return goActive(msg.getAssignments());
}
/**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
index 9045165b..1a4da150 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
@@ -63,7 +63,6 @@ public class QueryState extends ProcessingState {
@Override
public void start() {
-
super.start();
// start identification timer
@@ -85,39 +84,21 @@ public class QueryState extends ProcessingState {
if (!sawSelfIdent) {
// didn't see our identification
logger.error("missed our own Ident message on topic {}", getTopic());
- return internalTopicFailed();
+ return missedHeartbeat();
} else if (isLeader()) {
// "this" host is the new leader
logger.info("this host is the new leader for topic {}", getTopic());
return becomeLeader(alive);
- } else if (hasAssignment()) {
- /*
- * this host is not the new leader, but it does have an assignment -
- * return to the active state while we wait for the leader
- */
- logger.info("no new leader on topic {}", getTopic());
- return goActive();
-
} else {
- // not the leader and no assignment yet
+ // not the leader - return to previous state
logger.info("no new leader on topic {}", getTopic());
- return goInactive();
+ return goActive(getAssignments());
}
});
}
- /**
- * Determines if this host has an assignment in the CURRENT assignments.
- *
- * @return {@code true} if this host has an assignment, {@code false} otherwise
- */
- protected boolean hasAssignment() {
- BucketAssignments asgn = getAssignments();
- return (asgn != null && asgn.hasAssignment(getHost()));
- }
-
@Override
public State goQuery() {
return null;
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
index a978e245..3068cfc9 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -67,8 +67,22 @@ public class StartState extends State {
super.start();
- publish(getHost(), makeHeartbeat(hbTimestampMs));
+ Heartbeat hb = makeHeartbeat(hbTimestampMs);
+ publish(getHost(), hb);
+ /*
+ * heart beat generator
+ */
+ long genMs = getProperties().getInterHeartbeatMs();
+
+ scheduleWithFixedDelay(genMs, genMs, () -> {
+ publish(getHost(), hb);
+ return null;
+ });
+
+ /*
+ * my heart beat checker
+ */
schedule(getProperties().getStartHeartbeatMs(), () -> {
logger.error("missed heartbeat on topic {}", getTopic());
return internalTopicFailed();
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index 421b9225..54e93230 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -113,12 +113,21 @@ public abstract class State {
}
/**
- * Transitions to the "active" state.
+ * Goes active with a new set of assignments.
*
- * @return the new state
+ * @param asgn new assignments
+ * @return the new state, either Active or Inactive, depending on whether or not this
+ * host has an assignment
*/
- public final State goActive() {
- return mgr.goActive();
+ protected State goActive(BucketAssignments asgn) {
+ startDistributing(asgn);
+
+ if (asgn.hasAssignment(getHost())) {
+ return mgr.goActive();
+
+ } else {
+ return goInactive();
+ }
}
/**
@@ -322,7 +331,21 @@ public abstract class State {
}
/**
- * Indicates that the internal topic failed.
+ * Indicates that we failed to see our own heartbeat; must be a problem with the
+ * internal topic.
+ *
+ * @return a new {@link StartState}
+ */
+ protected final State missedHeartbeat() {
+ publish(makeOffline());
+ mgr.startDistributing(null);
+
+ return mgr.goStart();
+ }
+
+ /**
+ * Indicates that the internal topic failed; this should only be invoked from the
+ * StartState.
*
* @return a new {@link InactiveState}
*/