aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java255
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java96
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java85
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java55
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java410
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java209
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java132
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java370
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java37
9 files changed, 1649 insertions, 0 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
new file mode 100644
index 00000000..5f503a3b
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
@@ -0,0 +1,255 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.Arrays;
+import java.util.TreeSet;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+/**
+ * The active state. In this state, this host has one more more bucket
+ * assignments and processes any events associated with one of its buckets.
+ * Other events are forwarded to appropriate target hosts.
+ */
+public class ActiveState extends ProcessingState {
+
+ /**
+ * Set of hosts that have been assigned a bucket.
+ */
+ private final TreeSet<String> assigned = new TreeSet<>();
+
+ /**
+ * Host that comes after this host, or {@code null} if it has no successor.
+ */
+ private String succHost = null;
+
+ /**
+ * Host that comes before this host, or "" if it has no predecessor.
+ */
+ private String predHost = "";
+
+ /**
+ * {@code True} if we saw this host's heart beat since the last check,
+ * {@code false} otherwise.
+ */
+ private boolean myHeartbeatSeen = false;
+
+ /**
+ * {@code True} if we saw the predecessor's heart beat since the last check,
+ * {@code false} otherwise.
+ */
+ private boolean predHeartbeatSeen = false;
+
+ /**
+ *
+ * @param mgr
+ */
+ public ActiveState(PoolingManager mgr) {
+ super(mgr, mgr.getAssignments().getLeader());
+
+ assigned.addAll(Arrays.asList(mgr.getAssignments().getHostArray()));
+
+ detmNeighbors();
+ }
+
+ /**
+ * Determine this host's neighbors based on the order of the host UUIDs.
+ * Updates {@link #succHost} and {@link #predHost}.
+ */
+ private void detmNeighbors() {
+ if (assigned.size() < 2) {
+ /*
+ * this host is the only one with any assignments - it has no
+ * neighbors
+ */
+ succHost = null;
+ predHost = "";
+ return;
+ }
+
+ if ((succHost = assigned.higher(getHost())) == null) {
+ // wrapped around - successor is the first host in the set
+ succHost = assigned.first();
+ }
+
+ if ((predHost = assigned.lower(getHost())) == null) {
+ // wrapped around - predecessor is the last host in the set
+ predHost = assigned.last();
+ }
+ }
+
+ @Override
+ public void start() {
+ addTimers();
+ genHeartbeat();
+ }
+
+ /**
+ * Adds the timers.
+ */
+ private void addTimers() {
+
+ /*
+ * heart beat generator
+ */
+ long genMs = getProperties().getActiveHeartbeatMs();
+
+ scheduleWithFixedDelay(genMs, genMs, xxx -> {
+ genHeartbeat();
+ return null;
+ });
+
+ /*
+ * my heart beat checker
+ */
+ long interMs = getProperties().getInterHeartbeatMs();
+
+ scheduleWithFixedDelay(interMs, interMs, xxx -> {
+ if (myHeartbeatSeen) {
+ myHeartbeatSeen = false;
+ return null;
+ }
+
+ // missed my heart beat
+
+ return internalTopicFailed();
+ });
+
+ /*
+ * predecessor heart beat checker
+ */
+ if (!predHost.isEmpty()) {
+
+ scheduleWithFixedDelay(interMs, interMs, xxx -> {
+ if (predHeartbeatSeen) {
+ predHeartbeatSeen = false;
+ return null;
+ }
+
+ // missed the predecessor's heart beat
+ publish(makeQuery());
+
+ return goQuery();
+ });
+ }
+ }
+
+ /**
+ * Generates a heart beat for this host and its successor.
+ */
+ private void genHeartbeat() {
+ Heartbeat msg = makeHeartbeat(System.currentTimeMillis());
+ publish(getHost(), msg);
+
+ if (succHost != null) {
+ publish(succHost, msg);
+ }
+ }
+
+ @Override
+ public State process(Heartbeat msg) {
+ String src = msg.getSource();
+
+ if (src == null) {
+ return null;
+
+ } else if (src.equals(getHost())) {
+ myHeartbeatSeen = true;
+
+ } else if (src.equals(predHost)) {
+ predHeartbeatSeen = true;
+
+ }
+
+ return null;
+ }
+
+ @Override
+ public State process(Offline msg) {
+ String src = msg.getSource();
+
+ if (src == null) {
+ return null;
+
+ } else if (!assigned.contains(src)) {
+ /*
+ * the offline host wasn't assigned any buckets, so just ignore the
+ * message
+ */
+ return null;
+
+ } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) {
+ /*
+ * Case 1: We are the leader.
+ *
+ * Case 2: Our predecessor was the leader and it has gone offline -
+ * we should become the leader.
+ *
+ * In either case, we are now the leader and we must re-balance the
+ * buckets since one of the hosts has gone offline.
+ */
+
+ assigned.remove(src);
+
+ return becomeLeader(assigned);
+
+ } else {
+ /*
+ * Otherwise, we don't care right now - we'll wait for the leader to
+ * tell us it's been removed.
+ */
+ return null;
+ }
+ }
+
+ /**
+ * Transitions to the query state.
+ */
+ @Override
+ public State process(Query msg) {
+ State next = super.process(msg);
+ if (next != null) {
+ return next;
+ }
+
+ return goQuery();
+ }
+
+ protected String getSuccHost() {
+ return succHost;
+ }
+
+ protected String getPredHost() {
+ return predHost;
+ }
+
+ protected boolean isMyHeartbeatSeen() {
+ return myHeartbeatSeen;
+ }
+
+ protected boolean isPredHeartbeatSeen() {
+ return predHeartbeatSeen;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
new file mode 100644
index 00000000..a2da0ea2
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
@@ -0,0 +1,96 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Filter Utilities. These methods create <i>TreeMap</i> objects, because they
+ * should only contain a small number of items.
+ */
+public class FilterUtils {
+ // message element names
+ public static final String MSG_CHANNEL = "channel";
+ public static final String MSG_TIMESTAMP = "timestampMs";
+
+ // json element names
+ protected static final String JSON_CLASS = "class";
+ protected static final String JSON_FILTERS = "filters";
+ protected static final String JSON_FIELD = "field";
+ protected static final String JSON_VALUE = "value";
+
+ // values to be stuck into the "class" element
+ protected static final String CLASS_OR = "Or";
+ protected static final String CLASS_AND = "And";
+ protected static final String CLASS_EQUALS = "Equals";
+
+ /**
+ *
+ */
+ private FilterUtils() {
+ super();
+ }
+
+ /**
+ * Makes a filter that verifies that a field equals a value.
+ *
+ * @param field name of the field to check
+ * @param value desired value
+ * @return a map representing an "equals" filter
+ */
+ public static Map<String, Object> makeEquals(String field, String value) {
+ Map<String, Object> map = new TreeMap<>();
+ map.put(JSON_CLASS, CLASS_EQUALS);
+ map.put(JSON_FIELD, field);
+ map.put(JSON_VALUE, value);
+
+ return map;
+ }
+
+ /**
+ * Makes an "and" filter, where all of the items must be true.
+ *
+ * @param items items to be checked
+ * @return an "and" filter
+ */
+ public static Map<String, Object> makeAnd(@SuppressWarnings("unchecked") Map<String, Object>... items) {
+ Map<String, Object> map = new TreeMap<>();
+ map.put(JSON_CLASS, CLASS_AND);
+ map.put(JSON_FILTERS, items);
+
+ return map;
+ }
+
+ /**
+ * Makes an "or" filter, where at least one of the items must be true.
+ *
+ * @param items items to be checked
+ * @return an "or" filter
+ */
+ public static Map<String, Object> makeOr(@SuppressWarnings("unchecked") Map<String, Object>... items) {
+ Map<String, Object> map = new TreeMap<>();
+ map.put(JSON_CLASS, CLASS_OR);
+ map.put(JSON_FILTERS, items);
+
+ return map;
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java
new file mode 100644
index 00000000..27678360
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+/**
+ * Idle state, used when offline.
+ */
+public class IdleState extends State {
+
+ public IdleState(PoolingManager mgr) {
+ super(mgr);
+ }
+
+ @Override
+ public void stop() {
+ // do nothing - don't even send of "offline" message
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Heartbeat msg) {
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Identification msg) {
+ return null;
+ }
+
+ /**
+ * Copies the assignments, but doesn't change states.
+ */
+ @Override
+ public State process(Leader msg) {
+ super.process(msg);
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Offline msg) {
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Query msg) {
+ return null;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
new file mode 100644
index 00000000..1c8e4dcc
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -0,0 +1,55 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import org.onap.policy.drools.pooling.PoolingManager;
+
+/**
+ * The inactive state. In this state, we just wait a bit and then try to
+ * re-activate. In the meantime, all messages are ignored.
+ */
+public class InactiveState extends State {
+
+ /**
+ *
+ * @param mgr
+ */
+ public InactiveState(PoolingManager mgr) {
+ super(mgr);
+ }
+
+ @Override
+ public void start() {
+
+ super.start();
+
+ schedule(getProperties().getReactivateMs(), xxx -> goStart());
+ }
+
+ /**
+ * Remains in this state.
+ */
+ @Override
+ protected State goInactive() {
+ return null;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
new file mode 100644
index 00000000..2f830c66
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
@@ -0,0 +1,410 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Any state in which events are being processed locally and forwarded, as
+ * appropriate.
+ */
+public class ProcessingState extends State {
+
+ private static final Logger logger = LoggerFactory.getLogger(ProcessingState.class);
+
+ /**
+ * Current known leader, never {@code null}.
+ */
+ private String leader;
+
+ /**
+ *
+ * @param mgr
+ * @param leader current known leader, which need not be the same as the
+ * assignment leader. Never {@code null}
+ * @throws IllegalArgumentException if an argument is invalid
+ */
+ public ProcessingState(PoolingManager mgr, String leader) {
+ super(mgr);
+
+ if (leader == null) {
+ throw new IllegalArgumentException("null leader");
+ }
+
+ BucketAssignments assignments = mgr.getAssignments();
+
+ if (assignments != null) {
+ String[] arr = assignments.getHostArray();
+ if (arr != null && arr.length == 0) {
+ throw new IllegalArgumentException("zero-length bucket assignments");
+ }
+ }
+
+ this.leader = leader;
+ }
+
+ /**
+ * Generates an Identification message and returns {@code null}.
+ */
+ @Override
+ public State process(Query msg) {
+ publish(makeIdentification());
+ return goQuery();
+ }
+
+ /**
+ * Makes an Identification message.
+ *
+ * @return a new message
+ */
+ protected Identification makeIdentification() {
+ return new Identification(getHost(), getAssignments());
+ }
+
+ /**
+ * Sets the assignments.
+ *
+ * @param assignments new assignments, or {@code null}
+ */
+ protected final void setAssignments(BucketAssignments assignments) {
+ if (assignments != null) {
+ startDistributing(assignments);
+ }
+ }
+
+ public String getLeader() {
+ return leader;
+ }
+
+ /**
+ * Sets the leader.
+ *
+ * @param leader the new leader
+ * @throws IllegalArgumentException if an argument is invalid
+ */
+ protected void setLeader(String leader) {
+ if (leader == null) {
+ throw new IllegalArgumentException("null leader");
+ }
+
+ this.leader = leader;
+ }
+
+ /**
+ * Determines if this host is the leader, based on the current assignments.
+ *
+ * @return {@code true} if this host is the leader, {@code false} otherwise
+ */
+ public boolean isLeader() {
+ return getHost().equals(leader);
+ }
+
+ /**
+ * Becomes the leader. Publishes a Leader message and enters the
+ * {@link ActiveState}.
+ *
+ * @param alive hosts that are known to be alive
+ *
+ * @return the new state
+ */
+ protected State becomeLeader(SortedSet<String> alive) {
+ String leader = getHost();
+
+ if (!leader.equals(alive.first())) {
+ throw new IllegalArgumentException(leader + " cannot replace " + alive.first());
+ }
+
+ Leader msg = makeLeader(alive);
+ publish(msg);
+
+ setAssignments(msg.getAssignments());
+
+ return goActive();
+ }
+
+ /**
+ * Makes a leader message. Assumes "this" host is the leader, and thus
+ * appears as the first host in the set of hosts that are still alive.
+ *
+ * @param alive hosts that are known to be alive
+ *
+ * @return a new message
+ */
+ private Leader makeLeader(Set<String> alive) {
+ return new Leader(getHost(), makeAssignments(alive));
+ }
+
+ /**
+ * Makes a set of bucket assignments. Assumes "this" host is the leader.
+ *
+ * @param alive hosts that are known to be alive
+ *
+ * @return a new set of bucket assignments
+ */
+ private BucketAssignments makeAssignments(Set<String> alive) {
+
+ // make a working array from the CURRENT assignments
+ String[] bucket2host = makeBucketArray();
+
+ TreeSet<String> avail = new TreeSet<>(alive);
+
+ // if we have more hosts than buckets, then remove the extra hosts
+ removeExcessHosts(bucket2host.length, avail);
+
+ // create a host bucket for each available host
+ Map<String, HostBucket> host2hb = new HashMap<>();
+ avail.forEach(host -> host2hb.put(host, new HostBucket(host)));
+
+ // add bucket indices to the appropriate host bucket
+ addIndicesToHostBuckets(bucket2host, host2hb);
+
+ // convert the collection back to an array
+ fillArray(host2hb.values(), bucket2host);
+
+ // update bucket2host with new assignments
+ rebalanceBuckets(host2hb.values(), bucket2host);
+
+ return new BucketAssignments(bucket2host);
+ }
+
+ /**
+ * Makes a bucket array, copying the current assignments, if available.
+ *
+ * @return a new bucket array
+ */
+ private String[] makeBucketArray() {
+ BucketAssignments asgn = getAssignments();
+ if (asgn == null) {
+ return new String[BucketAssignments.MAX_BUCKETS];
+ }
+
+ String[] oldArray = asgn.getHostArray();
+ if (oldArray.length == 0) {
+ return new String[BucketAssignments.MAX_BUCKETS];
+ }
+
+ String[] newArray = new String[oldArray.length];
+ System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
+
+ return newArray;
+ }
+
+ /**
+ * Removes excess hosts from the set of available hosts. Assumes "this" host
+ * is the leader, and thus appears as the first host in the set.
+ *
+ * @param maxHosts maximum number of hosts to be retained
+ * @param avail available hosts
+ */
+ private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
+ while (avail.size() > maxHosts) {
+ /*
+ * Don't remove this host, as it's the leader. Since the leader is
+ * always at the front of the sorted set, we'll just pick off hosts
+ * from the back of the set.
+ */
+ String host = avail.last();
+ avail.remove(host);
+
+ logger.warn("not using extra host {} for topic {}", host, getTopic());
+ }
+ }
+
+ /**
+ * Adds bucket indices to {@link HostBucket} objects. Buckets that are
+ * unassigned or assigned to a host that does not appear within the map are
+ * re-assigned to a host that appears within the map.
+ *
+ * @param bucket2host bucket assignments
+ * @param host2data maps a host name to its {@link HostBucket}
+ */
+ private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
+ LinkedList<Integer> nullBuckets = new LinkedList<Integer>();
+
+ for (int x = 0; x < bucket2host.length; ++x) {
+ String host = bucket2host[x];
+ if (host == null) {
+ nullBuckets.add(x);
+
+ } else {
+ HostBucket hb = host2data.get(host);
+ if (hb == null) {
+ nullBuckets.add(x);
+
+ } else {
+ hb.add(x);
+ }
+ }
+ }
+
+ // assign the null buckets to other hosts
+ assignNullBuckets(nullBuckets, host2data.values());
+ }
+
+ /**
+ * Assigns null buckets (i.e., those having no assignment) to available
+ * hosts.
+ *
+ * @param buckets available hosts
+ * @param coll collection of current host-bucket assignments
+ */
+ private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
+ // assign null buckets to the hosts with the fewest buckets
+ TreeSet<HostBucket> assignments = new TreeSet<>(coll);
+
+ for (Integer index : buckets) {
+ // add it to the host with the shortest bucket list
+ HostBucket newhb = assignments.pollFirst();
+ newhb.add(index);
+
+ // put the item back into the queue, with its new count
+ assignments.add(newhb);
+ }
+ }
+
+ /**
+ * Re-balances the buckets, taking from those that have a larger count and
+ * giving to those that have a smaller count. Populates an output array with
+ * the new assignments.
+ *
+ * @param coll current bucket assignment
+ * @param bucket2host array to be populated with the new assignments
+ */
+ private void rebalanceBuckets(Collection<HostBucket> coll, String[] bucket2host) {
+ if (coll.size() <= 1) {
+ // only one hosts - nothing to rebalance
+ return;
+ }
+
+ TreeSet<HostBucket> assignments = new TreeSet<>(coll);
+
+ for (;;) {
+ HostBucket smaller = assignments.pollFirst();
+ HostBucket larger = assignments.pollLast();
+
+ if (larger.size() - smaller.size() <= 1) {
+ // it's as balanced as it will get
+ break;
+ }
+
+ // move the bucket from the larger to the smaller
+ Integer b = larger.remove();
+ smaller.add(b);
+
+ bucket2host[b] = smaller.host;
+
+ // put the items back, with their new counts
+ assignments.add(larger);
+ assignments.add(smaller);
+ }
+
+ }
+
+ /**
+ * Fills the array with the host assignments.
+ *
+ * @param coll the host assignments
+ * @param bucket2host array to be filled
+ */
+ private void fillArray(Collection<HostBucket> coll, String[] bucket2host) {
+ for (HostBucket hb : coll) {
+ for (Integer index : hb.buckets) {
+ bucket2host[index] = hb.host;
+ }
+ }
+ }
+
+ /**
+ * Tracks buckets that have been assigned to a host.
+ */
+ public static class HostBucket implements Comparable<HostBucket> {
+ /**
+ * Host to which the buckets have been assigned.
+ */
+ private String host;
+
+ /**
+ * Buckets that have been assigned to this host.
+ */
+ private Queue<Integer> buckets = new LinkedList<>();
+
+ /**
+ *
+ * @param host
+ */
+ public HostBucket(String host) {
+ this.host = host;
+ }
+
+ /**
+ * Removes the next bucket from the list.
+ *
+ * @return the next bucket
+ */
+ public final Integer remove() {
+ return buckets.remove();
+ }
+
+ /**
+ * Adds a bucket to the list.
+ *
+ * @param index index of the bucket to add
+ */
+ public final void add(Integer index) {
+ buckets.add(index);
+ }
+
+ /**
+ *
+ * @return the number of buckets assigned to this host
+ */
+ public final int size() {
+ return buckets.size();
+ }
+
+ /**
+ * Compares host buckets, first by the number of buckets, and then by
+ * the host name.
+ */
+ @Override
+ public final int compareTo(HostBucket other) {
+ int d = buckets.size() - other.buckets.size();
+ if (d == 0) {
+ d = host.compareTo(other.host);
+ }
+ return d;
+ }
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
new file mode 100644
index 00000000..57521960
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
@@ -0,0 +1,209 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.TreeSet;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+
+// TODO add logging
+
+/**
+ * The Query state. In this state, the host waits for the other hosts to
+ * identify themselves. Eventually, a leader should come forth. If not, it will
+ * transition to the active or inactive state, depending on whether or not it
+ * has an assignment in the current bucket assignments. The other possibility is
+ * that it may <i>become</i> the leader, in which case it will also transition
+ * to the active state.
+ */
+public class QueryState extends ProcessingState {
+
+ /**
+ * Hosts that have sent an "Identification" message. Always includes this
+ * host.
+ */
+ private TreeSet<String> alive = new TreeSet<>();
+
+ /**
+ *
+ * @param mgr
+ */
+ public QueryState(PoolingManager mgr) {
+ // this host is the leader, until a better candidate identifies itself
+ super(mgr, mgr.getHost());
+
+ alive.add(getHost());
+ }
+
+ @Override
+ public void start() {
+
+ super.start();
+
+ // start identification timer
+ awaitIdentification();
+ }
+
+ /**
+ * Starts a timer to wait for all Identification messages to arrive.
+ */
+ private void awaitIdentification() {
+
+ /*
+ * Once we've waited long enough for all Identification messages to
+ * arrive, become the leader, assuming we should.
+ */
+
+ schedule(getProperties().getIdentificationMs(), xxx -> {
+
+ if (isLeader()) {
+ // "this" host is the new leader
+ return becomeLeader(alive);
+
+ } else if (hasAssignment()) {
+ /*
+ * this host is not the new leader, but it does have an
+ * assignment - return to the active state while we wait for the
+ * leader
+ */
+ return goActive();
+
+ } else {
+ // not the leader and no assignment yet
+ return goInactive();
+ }
+ });
+ }
+
+ /**
+ * Remains in this state.
+ */
+ @Override
+ public State goQuery() {
+ return null;
+ }
+
+ /**
+ * Determines if this host has an assignment in the CURRENT assignments.
+ *
+ * @return {@code true} if this host has an assignment, {@code false}
+ * otherwise
+ */
+ protected boolean hasAssignment() {
+ BucketAssignments asgn = getAssignments();
+ return (asgn != null && asgn.hasAssignment(getHost()));
+ }
+
+ @Override
+ public State process(Identification msg) {
+
+ recordInfo(msg.getSource(), msg.getAssignments());
+
+ return null;
+ }
+
+ /**
+ * If the message leader is better than the leader we have, then go active
+ * with it. Otherwise, simply treat it like an {@link Identification}
+ * message.
+ */
+ @Override
+ public State process(Leader msg) {
+ BucketAssignments asgn = msg.getAssignments();
+ if (asgn == null) {
+ return null;
+ }
+
+ // ignore Leader messages from ourself
+ String source = msg.getSource();
+ if (source == null || source.equals(getHost())) {
+ return null;
+ }
+
+ // the new leader must equal the source
+ if (!source.equals(asgn.getLeader())) {
+ return null;
+ }
+
+ // go active, if this has a better leader than the one we have
+ if (source.compareTo(getLeader()) < 0) {
+ return super.process(msg);
+ }
+
+ /*
+ * The message does not have an acceptable leader, but we'll still
+ * record its info.
+ */
+ recordInfo(msg.getSource(), msg.getAssignments());
+
+ return null;
+ }
+
+ /**
+ * Records info from a message, adding the source host name to
+ * {@link #alive}, and updating the bucket assignments.
+ *
+ * @param source the message's source host
+ * @param assignments assignments, or {@code null}
+ */
+ private void recordInfo(String source, BucketAssignments assignments) {
+ // add this message's source host to "alive"
+ if (source != null) {
+ alive.add(source);
+ setLeader(alive.first());
+ }
+
+ if (assignments == null || assignments.getLeader() == null) {
+ return;
+ }
+
+ // record assignments, if we don't have any yet
+ BucketAssignments current = getAssignments();
+ if (current == null) {
+ setAssignments(assignments);
+ return;
+ }
+
+ /*
+ * Record assignments, if the new assignments have a better (i.e.,
+ * lesser) leader.
+ */
+ String curldr = current.getLeader();
+ if (curldr == null || curldr.compareTo(assignments.getLeader()) > 0) {
+ setAssignments(assignments);
+ }
+ }
+
+ @Override
+ public State process(Offline msg) {
+ String host = msg.getSource();
+
+ if (host != null && !host.equals(getHost())) {
+ alive.remove(host);
+ setLeader(alive.first());
+ }
+
+ return null;
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
new file mode 100644
index 00000000..decbdfda
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -0,0 +1,132 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
+import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
+import java.util.Map;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+/**
+ * The start state. Upon entry, a heart beat is generated and the event filter
+ * is changed to look for just that particular message. Once the message is
+ * seen, it goes into the {@link QueryState}.
+ */
+public class StartState extends State {
+
+ /**
+ * Time stamp inserted into the heart beat message.
+ */
+ private long hbTimestampMs = System.currentTimeMillis();
+
+ /**
+ *
+ * @param mgr
+ */
+ public StartState(PoolingManager mgr) {
+ super(mgr);
+ }
+
+ /**
+ *
+ * @return the time stamp inserted into the heart beat message
+ */
+ public long getHbTimestampMs() {
+ return hbTimestampMs;
+ }
+
+ @Override
+ public void start() {
+
+ super.start();
+
+ publish(getHost(), makeHeartbeat(hbTimestampMs));
+
+ schedule(getProperties().getStartHeartbeatMs(), xxx -> internalTopicFailed());
+ }
+
+ /**
+ * Transitions to the query state if the heart beat originated from this
+ * host and its time stamp matches.
+ */
+ @Override
+ public State process(Heartbeat msg) {
+ if (msg.getTimestampMs() == hbTimestampMs && getHost().equals(msg.getSource())) {
+ // saw our own heart beat - transition to query state
+ publish(makeQuery());
+ return goQuery();
+ }
+
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Identification msg) {
+ return null;
+ }
+
+ /**
+ * Processes the assignments, but remains in the current state.
+ */
+ @Override
+ public State process(Leader msg) {
+ super.process(msg);
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Offline msg) {
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Query msg) {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, Object> getFilter() {
+ // ignore everything except our most recent heart beat message
+ return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeAnd(makeEquals(MSG_CHANNEL, getHost()),
+ makeEquals(MSG_TIMESTAMP, String.valueOf(hbTimestampMs))));
+
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
new file mode 100644
index 00000000..1e3a907e
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -0,0 +1,370 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.PoolingProperties;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+/**
+ * A state in the finite state machine.
+ * <p>
+ * A state may have several timers associated with it, which must be cancelled
+ * whenever the state is changed. Assumes that timers are not continuously added
+ * to the same state.
+ */
+public abstract class State {
+
+ /**
+ * Host pool manager.
+ */
+ private final PoolingManager mgr;
+
+ /**
+ * Timers added by this state.
+ */
+ private final List<ScheduledFuture<?>> timers = new LinkedList<>();
+
+ /**
+ *
+ * @param mgr
+ */
+ public State(PoolingManager mgr) {
+ this.mgr = mgr;
+ }
+
+ /**
+ * Gets the server-side filter to use when polling the DMaaP internal topic.
+ * The default method returns a filter that accepts messages on the admin
+ * channel and on the host's own channel.
+ *
+ * @return the server-side filter to use.
+ */
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getFilter() {
+ return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
+ }
+
+ /**
+ * Cancels the timers added by this state.
+ */
+ public void cancelTimers() {
+ for (ScheduledFuture<?> fut : timers) {
+ fut.cancel(false);
+ }
+ }
+
+ /**
+ * Starts the state.
+ */
+ public void start() {
+
+ }
+
+ /**
+ * Indicates that the finite state machine is stopping. Sends an "offline"
+ * message to the other hosts.
+ */
+ public void stop() {
+ publish(makeOffline());
+ }
+
+ /**
+ * Transitions to the "start" state.
+ *
+ * @return the new state
+ */
+ public State goStart() {
+ return mgr.goStart();
+ }
+
+ /**
+ * Transitions to the "query" state.
+ *
+ * @return the new state
+ */
+ public State goQuery() {
+ return mgr.goQuery();
+ }
+
+ /**
+ * Transitions to the "active" state.
+ *
+ * @return the new state
+ */
+ public State goActive() {
+ return mgr.goActive();
+ }
+
+ /**
+ * Transitions to the "inactive" state.
+ *
+ * @return the new state
+ */
+ protected State goInactive() {
+ return mgr.goInactive();
+ }
+
+ /**
+ * Processes a message. The default method passes it to the manager to
+ * handle and returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Forward msg) {
+ mgr.handle(msg);
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Heartbeat msg) {
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Identification msg) {
+ return null;
+ }
+
+ /**
+ * Processes a message. If this host has a new assignment, then it
+ * transitions to the active state. Otherwise, it transitions to the
+ * inactive state.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Leader msg) {
+ BucketAssignments asgn = msg.getAssignments();
+ if (asgn == null) {
+ return null;
+ }
+
+ String source = msg.getSource();
+ if (source == null) {
+ return null;
+ }
+
+ // the new leader must equal the source
+ if (source.equals(asgn.getLeader())) {
+ startDistributing(asgn);
+
+ if (asgn.hasAssignment(getHost())) {
+ return goActive();
+
+ } else {
+ return goInactive();
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Offline msg) {
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Query msg) {
+ return null;
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected void publish(Identification msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected void publish(Leader msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected void publish(Offline msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected void publish(Query msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message on the specified channel.
+ *
+ * @param channel
+ * @param msg message to be published
+ */
+ protected void publish(String channel, Forward msg) {
+ mgr.publish(channel, msg);
+ }
+
+ /**
+ * Publishes a message on the specified channel.
+ *
+ * @param channel
+ * @param msg message to be published
+ */
+ protected void publish(String channel, Heartbeat msg) {
+ mgr.publish(channel, msg);
+ }
+
+ /**
+ * Starts distributing messages using the specified bucket assignments.
+ *
+ * @param assignments
+ */
+ protected void startDistributing(BucketAssignments assignments) {
+ if (assignments != null) {
+ mgr.startDistributing(assignments);
+ }
+ }
+
+ /**
+ * Schedules a timer to fire after a delay.
+ *
+ * @param delayMs
+ * @param task
+ */
+ protected void schedule(long delayMs, StateTimerTask task) {
+ timers.add(mgr.schedule(delayMs, task));
+ }
+
+ /**
+ * Schedules a timer to fire repeatedly.
+ *
+ * @param initialDelayMs
+ * @param delayMs
+ * @param task
+ */
+ protected void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
+ }
+
+ /**
+ * Indicates that the internal topic failed.
+ *
+ * @return a new {@link InactiveState}
+ */
+ protected State internalTopicFailed() {
+ publish(makeOffline());
+ mgr.internalTopicFailed();
+
+ return mgr.goInactive();
+ }
+
+ /**
+ * Makes a heart beat message.
+ *
+ * @param timestampMs time, in milliseconds, associated with the message
+ *
+ * @return a new message
+ */
+ protected Heartbeat makeHeartbeat(long timestampMs) {
+ return new Heartbeat(getHost(), timestampMs);
+ }
+
+ /**
+ * Makes an "offline" message.
+ *
+ * @return a new message
+ */
+ protected Offline makeOffline() {
+ return new Offline(getHost());
+ }
+
+ /**
+ * Makes a query message.
+ *
+ * @return a new message
+ */
+ protected Query makeQuery() {
+ return new Query(getHost());
+ }
+
+ public final BucketAssignments getAssignments() {
+ return mgr.getAssignments();
+ }
+
+ public final String getHost() {
+ return mgr.getHost();
+ }
+
+ public final String getTopic() {
+ return mgr.getTopic();
+ }
+
+ public final PoolingProperties getProperties() {
+ return mgr.getProperties();
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java
new file mode 100644
index 00000000..bd388b4e
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+/**
+ * Task to be executed when a timer fires within a {@link State}.
+ */
+@FunctionalInterface
+public interface StateTimerTask {
+
+ /**
+ * Fires the timer.
+ *
+ * @param arg always {@code null}
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State fire(Void arg);
+
+}