diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java | 39 |
1 files changed, 35 insertions, 4 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java index 1c8e4dcc..6be2fb84 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java @@ -21,13 +21,19 @@ package org.onap.policy.drools.pooling.state; import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The inactive state. In this state, we just wait a bit and then try to - * re-activate. In the meantime, all messages are ignored. + * The inactive state. In this state, we just wait a bit and then try to re-activate. In + * the meantime, all messages are ignored. */ public class InactiveState extends State { + private static final Logger logger = LoggerFactory.getLogger(InactiveState.class); + /** * * @param mgr @@ -41,11 +47,36 @@ public class InactiveState extends State { super.start(); - schedule(getProperties().getReactivateMs(), xxx -> goStart()); + schedule(getProperties().getReactivateMs(), () -> goStart()); + } + + @Override + public State process(Leader msg) { + if(isValid(msg)) { + logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic()); + startDistributing(msg.getAssignments()); + + if(msg.getAssignments().hasAssignment(getHost())) { + logger.info("received Leader message on topic {}", getTopic()); + return goActive(); + } + } + + return null; + } + + /** + * Generates an Identification message and goes to the query state. + */ + @Override + public State process(Query msg) { + logger.info("received Query message on topic {}", getTopic()); + publish(makeIdentification()); + return goQuery(); } /** - * Remains in this state. + * Remains in this state, without resetting any timers. */ @Override protected State goInactive() { |