diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java | 96 |
1 files changed, 56 insertions, 40 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java index 2f830c66..1e9bb581 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java @@ -30,15 +30,13 @@ import java.util.SortedSet; import java.util.TreeSet; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.BucketAssignments; -import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Any state in which events are being processed locally and forwarded, as - * appropriate. + * Any state in which events are being processed locally and forwarded, as appropriate. */ public class ProcessingState extends State { @@ -52,8 +50,8 @@ public class ProcessingState extends State { /** * * @param mgr - * @param leader current known leader, which need not be the same as the - * assignment leader. Never {@code null} + * @param leader current known leader, which need not be the same as the assignment + * leader. Never {@code null} * @throws IllegalArgumentException if an argument is invalid */ public ProcessingState(PoolingManager mgr, String leader) { @@ -76,21 +74,31 @@ public class ProcessingState extends State { } /** - * Generates an Identification message and returns {@code null}. + * Goes active with a new set of assignments. + * + * @param asgn new assignments + * @return the new state, either Active or Inactive, depending on whether or not this + * host has an assignment */ - @Override - public State process(Query msg) { - publish(makeIdentification()); - return goQuery(); + protected State goActive(BucketAssignments asgn) { + startDistributing(asgn); + + if (asgn.hasAssignment(getHost())) { + return goActive(); + + } else { + return goInactive(); + } } /** - * Makes an Identification message. - * - * @return a new message + * Generates an Identification message and goes to the query state. */ - protected Identification makeIdentification() { - return new Identification(getHost(), getAssignments()); + @Override + public State process(Query msg) { + logger.info("received Query message on topic {}", getTopic()); + publish(makeIdentification()); + return goQuery(); } /** @@ -132,18 +140,17 @@ public class ProcessingState extends State { } /** - * Becomes the leader. Publishes a Leader message and enters the - * {@link ActiveState}. + * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}. * * @param alive hosts that are known to be alive * * @return the new state */ protected State becomeLeader(SortedSet<String> alive) { - String leader = getHost(); + String newLeader = getHost(); - if (!leader.equals(alive.first())) { - throw new IllegalArgumentException(leader + " cannot replace " + alive.first()); + if (!newLeader.equals(alive.first())) { + throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first()); } Leader msg = makeLeader(alive); @@ -155,8 +162,8 @@ public class ProcessingState extends State { } /** - * Makes a leader message. Assumes "this" host is the leader, and thus - * appears as the first host in the set of hosts that are still alive. + * Makes a leader message. Assumes "this" host is the leader, and thus appears as the + * first host in the set of hosts that are still alive. * * @param alive hosts that are known to be alive * @@ -222,8 +229,8 @@ public class ProcessingState extends State { } /** - * Removes excess hosts from the set of available hosts. Assumes "this" host - * is the leader, and thus appears as the first host in the set. + * Removes excess hosts from the set of available hosts. Assumes "this" host is the + * leader, and thus appears as the first host in the set. * * @param maxHosts maximum number of hosts to be retained * @param avail available hosts @@ -231,9 +238,9 @@ public class ProcessingState extends State { private void removeExcessHosts(int maxHosts, SortedSet<String> avail) { while (avail.size() > maxHosts) { /* - * Don't remove this host, as it's the leader. Since the leader is - * always at the front of the sorted set, we'll just pick off hosts - * from the back of the set. + * Don't remove this host, as it's the leader. Since the leader is always at + * the front of the sorted set, we'll just pick off hosts from the back of the + * set. */ String host = avail.last(); avail.remove(host); @@ -243,15 +250,15 @@ public class ProcessingState extends State { } /** - * Adds bucket indices to {@link HostBucket} objects. Buckets that are - * unassigned or assigned to a host that does not appear within the map are - * re-assigned to a host that appears within the map. + * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or + * assigned to a host that does not appear within the map are re-assigned to a host + * that appears within the map. * * @param bucket2host bucket assignments * @param host2data maps a host name to its {@link HostBucket} */ private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) { - LinkedList<Integer> nullBuckets = new LinkedList<Integer>(); + LinkedList<Integer> nullBuckets = new LinkedList<>(); for (int x = 0; x < bucket2host.length; ++x) { String host = bucket2host[x]; @@ -274,10 +281,9 @@ public class ProcessingState extends State { } /** - * Assigns null buckets (i.e., those having no assignment) to available - * hosts. + * Assigns null buckets (i.e., those having no assignment) to available hosts. * - * @param buckets available hosts + * @param buckets buckets that still need to be assigned to hosts * @param coll collection of current host-bucket assignments */ private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) { @@ -295,9 +301,9 @@ public class ProcessingState extends State { } /** - * Re-balances the buckets, taking from those that have a larger count and - * giving to those that have a smaller count. Populates an output array with - * the new assignments. + * Re-balances the buckets, taking from those that have a larger count and giving to + * those that have a smaller count. Populates an output array with the new + * assignments. * * @param coll current bucket assignment * @param bucket2host array to be populated with the new assignments @@ -349,7 +355,7 @@ public class ProcessingState extends State { /** * Tracks buckets that have been assigned to a host. */ - public static class HostBucket implements Comparable<HostBucket> { + protected static class HostBucket implements Comparable<HostBucket> { /** * Host to which the buckets have been assigned. */ @@ -395,8 +401,8 @@ public class ProcessingState extends State { } /** - * Compares host buckets, first by the number of buckets, and then by - * the host name. + * Compares host buckets, first by the number of buckets, and then by the host + * name. */ @Override public final int compareTo(HostBucket other) { @@ -406,5 +412,15 @@ public class ProcessingState extends State { } return d; } + + @Override + public final int hashCode() { + throw new UnsupportedOperationException("HostBucket cannot be hashed"); + } + + @Override + public final boolean equals(Object obj) { + throw new UnsupportedOperationException("cannot compare HostBuckets"); + } } } |