aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java209
1 files changed, 209 insertions, 0 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
new file mode 100644
index 00000000..57521960
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
@@ -0,0 +1,209 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.TreeSet;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+
+// TODO add logging
+
+/**
+ * The Query state. In this state, the host waits for the other hosts to
+ * identify themselves. Eventually, a leader should come forth. If not, it will
+ * transition to the active or inactive state, depending on whether or not it
+ * has an assignment in the current bucket assignments. The other possibility is
+ * that it may <i>become</i> the leader, in which case it will also transition
+ * to the active state.
+ */
+public class QueryState extends ProcessingState {
+
+ /**
+ * Hosts that have sent an "Identification" message. Always includes this
+ * host.
+ */
+ private TreeSet<String> alive = new TreeSet<>();
+
+ /**
+ *
+ * @param mgr
+ */
+ public QueryState(PoolingManager mgr) {
+ // this host is the leader, until a better candidate identifies itself
+ super(mgr, mgr.getHost());
+
+ alive.add(getHost());
+ }
+
+ @Override
+ public void start() {
+
+ super.start();
+
+ // start identification timer
+ awaitIdentification();
+ }
+
+ /**
+ * Starts a timer to wait for all Identification messages to arrive.
+ */
+ private void awaitIdentification() {
+
+ /*
+ * Once we've waited long enough for all Identification messages to
+ * arrive, become the leader, assuming we should.
+ */
+
+ schedule(getProperties().getIdentificationMs(), xxx -> {
+
+ if (isLeader()) {
+ // "this" host is the new leader
+ return becomeLeader(alive);
+
+ } else if (hasAssignment()) {
+ /*
+ * this host is not the new leader, but it does have an
+ * assignment - return to the active state while we wait for the
+ * leader
+ */
+ return goActive();
+
+ } else {
+ // not the leader and no assignment yet
+ return goInactive();
+ }
+ });
+ }
+
+ /**
+ * Remains in this state.
+ */
+ @Override
+ public State goQuery() {
+ return null;
+ }
+
+ /**
+ * Determines if this host has an assignment in the CURRENT assignments.
+ *
+ * @return {@code true} if this host has an assignment, {@code false}
+ * otherwise
+ */
+ protected boolean hasAssignment() {
+ BucketAssignments asgn = getAssignments();
+ return (asgn != null && asgn.hasAssignment(getHost()));
+ }
+
+ @Override
+ public State process(Identification msg) {
+
+ recordInfo(msg.getSource(), msg.getAssignments());
+
+ return null;
+ }
+
+ /**
+ * If the message leader is better than the leader we have, then go active
+ * with it. Otherwise, simply treat it like an {@link Identification}
+ * message.
+ */
+ @Override
+ public State process(Leader msg) {
+ BucketAssignments asgn = msg.getAssignments();
+ if (asgn == null) {
+ return null;
+ }
+
+ // ignore Leader messages from ourself
+ String source = msg.getSource();
+ if (source == null || source.equals(getHost())) {
+ return null;
+ }
+
+ // the new leader must equal the source
+ if (!source.equals(asgn.getLeader())) {
+ return null;
+ }
+
+ // go active, if this has a better leader than the one we have
+ if (source.compareTo(getLeader()) < 0) {
+ return super.process(msg);
+ }
+
+ /*
+ * The message does not have an acceptable leader, but we'll still
+ * record its info.
+ */
+ recordInfo(msg.getSource(), msg.getAssignments());
+
+ return null;
+ }
+
+ /**
+ * Records info from a message, adding the source host name to
+ * {@link #alive}, and updating the bucket assignments.
+ *
+ * @param source the message's source host
+ * @param assignments assignments, or {@code null}
+ */
+ private void recordInfo(String source, BucketAssignments assignments) {
+ // add this message's source host to "alive"
+ if (source != null) {
+ alive.add(source);
+ setLeader(alive.first());
+ }
+
+ if (assignments == null || assignments.getLeader() == null) {
+ return;
+ }
+
+ // record assignments, if we don't have any yet
+ BucketAssignments current = getAssignments();
+ if (current == null) {
+ setAssignments(assignments);
+ return;
+ }
+
+ /*
+ * Record assignments, if the new assignments have a better (i.e.,
+ * lesser) leader.
+ */
+ String curldr = current.getLeader();
+ if (curldr == null || curldr.compareTo(assignments.getLeader()) > 0) {
+ setAssignments(assignments);
+ }
+ }
+
+ @Override
+ public State process(Offline msg) {
+ String host = msg.getSource();
+
+ if (host != null && !host.equals(getHost())) {
+ alive.remove(host);
+ setLeader(alive.first());
+ }
+
+ return null;
+ }
+}