aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java96
1 files changed, 56 insertions, 40 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
index 2f830c66..1e9bb581 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
@@ -30,15 +30,13 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Any state in which events are being processed locally and forwarded, as
- * appropriate.
+ * Any state in which events are being processed locally and forwarded, as appropriate.
*/
public class ProcessingState extends State {
@@ -52,8 +50,8 @@ public class ProcessingState extends State {
/**
*
* @param mgr
- * @param leader current known leader, which need not be the same as the
- * assignment leader. Never {@code null}
+ * @param leader current known leader, which need not be the same as the assignment
+ * leader. Never {@code null}
* @throws IllegalArgumentException if an argument is invalid
*/
public ProcessingState(PoolingManager mgr, String leader) {
@@ -76,21 +74,31 @@ public class ProcessingState extends State {
}
/**
- * Generates an Identification message and returns {@code null}.
+ * Goes active with a new set of assignments.
+ *
+ * @param asgn new assignments
+ * @return the new state, either Active or Inactive, depending on whether or not this
+ * host has an assignment
*/
- @Override
- public State process(Query msg) {
- publish(makeIdentification());
- return goQuery();
+ protected State goActive(BucketAssignments asgn) {
+ startDistributing(asgn);
+
+ if (asgn.hasAssignment(getHost())) {
+ return goActive();
+
+ } else {
+ return goInactive();
+ }
}
/**
- * Makes an Identification message.
- *
- * @return a new message
+ * Generates an Identification message and goes to the query state.
*/
- protected Identification makeIdentification() {
- return new Identification(getHost(), getAssignments());
+ @Override
+ public State process(Query msg) {
+ logger.info("received Query message on topic {}", getTopic());
+ publish(makeIdentification());
+ return goQuery();
}
/**
@@ -132,18 +140,17 @@ public class ProcessingState extends State {
}
/**
- * Becomes the leader. Publishes a Leader message and enters the
- * {@link ActiveState}.
+ * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}.
*
* @param alive hosts that are known to be alive
*
* @return the new state
*/
protected State becomeLeader(SortedSet<String> alive) {
- String leader = getHost();
+ String newLeader = getHost();
- if (!leader.equals(alive.first())) {
- throw new IllegalArgumentException(leader + " cannot replace " + alive.first());
+ if (!newLeader.equals(alive.first())) {
+ throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first());
}
Leader msg = makeLeader(alive);
@@ -155,8 +162,8 @@ public class ProcessingState extends State {
}
/**
- * Makes a leader message. Assumes "this" host is the leader, and thus
- * appears as the first host in the set of hosts that are still alive.
+ * Makes a leader message. Assumes "this" host is the leader, and thus appears as the
+ * first host in the set of hosts that are still alive.
*
* @param alive hosts that are known to be alive
*
@@ -222,8 +229,8 @@ public class ProcessingState extends State {
}
/**
- * Removes excess hosts from the set of available hosts. Assumes "this" host
- * is the leader, and thus appears as the first host in the set.
+ * Removes excess hosts from the set of available hosts. Assumes "this" host is the
+ * leader, and thus appears as the first host in the set.
*
* @param maxHosts maximum number of hosts to be retained
* @param avail available hosts
@@ -231,9 +238,9 @@ public class ProcessingState extends State {
private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
while (avail.size() > maxHosts) {
/*
- * Don't remove this host, as it's the leader. Since the leader is
- * always at the front of the sorted set, we'll just pick off hosts
- * from the back of the set.
+ * Don't remove this host, as it's the leader. Since the leader is always at
+ * the front of the sorted set, we'll just pick off hosts from the back of the
+ * set.
*/
String host = avail.last();
avail.remove(host);
@@ -243,15 +250,15 @@ public class ProcessingState extends State {
}
/**
- * Adds bucket indices to {@link HostBucket} objects. Buckets that are
- * unassigned or assigned to a host that does not appear within the map are
- * re-assigned to a host that appears within the map.
+ * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or
+ * assigned to a host that does not appear within the map are re-assigned to a host
+ * that appears within the map.
*
* @param bucket2host bucket assignments
* @param host2data maps a host name to its {@link HostBucket}
*/
private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
- LinkedList<Integer> nullBuckets = new LinkedList<Integer>();
+ LinkedList<Integer> nullBuckets = new LinkedList<>();
for (int x = 0; x < bucket2host.length; ++x) {
String host = bucket2host[x];
@@ -274,10 +281,9 @@ public class ProcessingState extends State {
}
/**
- * Assigns null buckets (i.e., those having no assignment) to available
- * hosts.
+ * Assigns null buckets (i.e., those having no assignment) to available hosts.
*
- * @param buckets available hosts
+ * @param buckets buckets that still need to be assigned to hosts
* @param coll collection of current host-bucket assignments
*/
private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
@@ -295,9 +301,9 @@ public class ProcessingState extends State {
}
/**
- * Re-balances the buckets, taking from those that have a larger count and
- * giving to those that have a smaller count. Populates an output array with
- * the new assignments.
+ * Re-balances the buckets, taking from those that have a larger count and giving to
+ * those that have a smaller count. Populates an output array with the new
+ * assignments.
*
* @param coll current bucket assignment
* @param bucket2host array to be populated with the new assignments
@@ -349,7 +355,7 @@ public class ProcessingState extends State {
/**
* Tracks buckets that have been assigned to a host.
*/
- public static class HostBucket implements Comparable<HostBucket> {
+ protected static class HostBucket implements Comparable<HostBucket> {
/**
* Host to which the buckets have been assigned.
*/
@@ -395,8 +401,8 @@ public class ProcessingState extends State {
}
/**
- * Compares host buckets, first by the number of buckets, and then by
- * the host name.
+ * Compares host buckets, first by the number of buckets, and then by the host
+ * name.
*/
@Override
public final int compareTo(HostBucket other) {
@@ -406,5 +412,15 @@ public class ProcessingState extends State {
}
return d;
}
+
+ @Override
+ public final int hashCode() {
+ throw new UnsupportedOperationException("HostBucket cannot be hashed");
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ throw new UnsupportedOperationException("cannot compare HostBuckets");
+ }
}
}