summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java39
1 files changed, 35 insertions, 4 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
index 1c8e4dcc..6be2fb84 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -21,13 +21,19 @@
package org.onap.policy.drools.pooling.state;
import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The inactive state. In this state, we just wait a bit and then try to
- * re-activate. In the meantime, all messages are ignored.
+ * The inactive state. In this state, we just wait a bit and then try to re-activate. In
+ * the meantime, all messages are ignored.
*/
public class InactiveState extends State {
+ private static final Logger logger = LoggerFactory.getLogger(InactiveState.class);
+
/**
*
* @param mgr
@@ -41,11 +47,36 @@ public class InactiveState extends State {
super.start();
- schedule(getProperties().getReactivateMs(), xxx -> goStart());
+ schedule(getProperties().getReactivateMs(), () -> goStart());
+ }
+
+ @Override
+ public State process(Leader msg) {
+ if(isValid(msg)) {
+ logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic());
+ startDistributing(msg.getAssignments());
+
+ if(msg.getAssignments().hasAssignment(getHost())) {
+ logger.info("received Leader message on topic {}", getTopic());
+ return goActive();
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Generates an Identification message and goes to the query state.
+ */
+ @Override
+ public State process(Query msg) {
+ logger.info("received Query message on topic {}", getTopic());
+ publish(makeIdentification());
+ return goQuery();
}
/**
- * Remains in this state.
+ * Remains in this state, without resetting any timers.
*/
@Override
protected State goInactive() {