aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java')
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java373
1 files changed, 373 insertions, 0 deletions
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java
new file mode 100644
index 00000000..e2cf9586
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -0,0 +1,373 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.onap.policy.drools.pooling.CancellableScheduledTask;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.PoolingProperties;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A state in the finite state machine.
+ *
+ * <p>A state may have several timers associated with it, which must be cancelled whenever
+ * the state is changed. Assumes that timers are not continuously added to the same state.
+ */
+public abstract class State {
+
+ private static final Logger logger = LoggerFactory.getLogger(State.class);
+
+ /**
+ * Host pool manager.
+ */
+ private final PoolingManager mgr;
+
+ /**
+ * Timers added by this state.
+ */
+ private final List<CancellableScheduledTask> timers = new LinkedList<>();
+
+ /**
+ * Constructor.
+ *
+ * @param mgr pooling manager
+ */
+ protected State(PoolingManager mgr) {
+ this.mgr = mgr;
+ }
+
+ /**
+ * Cancels the timers added by this state.
+ */
+ public final void cancelTimers() {
+ timers.forEach(CancellableScheduledTask::cancel);
+ }
+
+ /**
+ * Starts the state. The default method simply logs a message and returns.
+ */
+ public void start() {
+ logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
+ }
+
+ /**
+ * Transitions to the "start" state.
+ *
+ * @return the new state
+ */
+ public final State goStart() {
+ return mgr.goStart();
+ }
+
+ /**
+ * Transitions to the "query" state.
+ *
+ * @return the new state
+ */
+ public State goQuery() {
+ return mgr.goQuery();
+ }
+
+ /**
+ * Goes active with a new set of assignments.
+ *
+ * @param asgn new assignments
+ * @return the new state, either Active or Inactive, depending on whether or not this
+ * host has an assignment
+ */
+ protected State goActive(BucketAssignments asgn) {
+ startDistributing(asgn);
+
+ if (asgn != null && asgn.hasAssignment(getHost())) {
+ return mgr.goActive();
+
+ } else {
+ return goInactive();
+ }
+ }
+
+ /**
+ * Transitions to the "inactive" state.
+ *
+ * @return the new state
+ */
+ protected State goInactive() {
+ return mgr.goInactive();
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Heartbeat msg) {
+ logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Identification msg) {
+ logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method copies the assignments and then returns
+ * {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Leader msg) {
+ if (isValid(msg)) {
+ logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
+ startDistributing(msg.getAssignments());
+ }
+
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Offline msg) {
+ logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Query msg) {
+ logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
+ * Determines if a message is valid and did not originate from this host.
+ *
+ * @param msg message to be validated
+ * @return {@code true} if the message is valid, {@code false} otherwise
+ */
+ protected boolean isValid(Leader msg) {
+ BucketAssignments asgn = msg.getAssignments();
+ if (asgn == null) {
+ logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
+ return false;
+ }
+
+ // ignore Leader messages from ourself
+ String source = msg.getSource();
+ if (source == null || source.equals(getHost())) {
+ logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
+ return false;
+ }
+
+ // the new leader must equal the source
+ boolean result = source.equals(asgn.getLeader());
+
+ if (!result) {
+ logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
+ }
+
+ return result;
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected final void publish(Identification msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected final void publish(Leader msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected final void publish(Offline msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected final void publish(Query msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message on the specified channel.
+ *
+ * @param channel channel
+ * @param msg message to be published
+ */
+ protected final void publish(String channel, Heartbeat msg) {
+ mgr.publish(channel, msg);
+ }
+
+ /**
+ * Starts distributing messages using the specified bucket assignments.
+ *
+ * @param assignments assignments
+ */
+ protected final void startDistributing(BucketAssignments assignments) {
+ if (assignments != null) {
+ mgr.startDistributing(assignments);
+ }
+ }
+
+ /**
+ * Schedules a timer to fire after a delay.
+ *
+ * @param delayMs delay in ms
+ * @param task task
+ */
+ protected final void schedule(long delayMs, StateTimerTask task) {
+ timers.add(mgr.schedule(delayMs, task));
+ }
+
+ /**
+ * Schedules a timer to fire repeatedly.
+ *
+ * @param initialDelayMs initial delay ms
+ * @param delayMs delay ms
+ * @param task task
+ */
+ protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
+ }
+
+ /**
+ * Indicates that we failed to see our own heartbeat; must be a problem with the
+ * internal topic. Assumes the problem is temporary and continues to use the current
+ * bucket assignments.
+ *
+ * @return a new {@link StartState}
+ */
+ protected final State missedHeartbeat() {
+ publish(makeOffline());
+
+ return mgr.goStart();
+ }
+
+ /**
+ * Indicates that the internal topic failed; this should only be invoked from the
+ * StartState. Discards bucket assignments and begins processing everything locally.
+ *
+ * @return a new {@link InactiveState}
+ */
+ protected final State internalTopicFailed() {
+ publish(makeOffline());
+ mgr.startDistributing(null);
+
+ return mgr.goInactive();
+ }
+
+ /**
+ * Makes a heart beat message.
+ *
+ * @param timestampMs time, in milliseconds, associated with the message
+ *
+ * @return a new message
+ */
+ protected final Heartbeat makeHeartbeat(long timestampMs) {
+ return new Heartbeat(getHost(), timestampMs);
+ }
+
+ /**
+ * Makes an Identification message.
+ *
+ * @return a new message
+ */
+ protected Identification makeIdentification() {
+ return new Identification(getHost(), getAssignments());
+ }
+
+ /**
+ * Makes an "offline" message.
+ *
+ * @return a new message
+ */
+ protected final Offline makeOffline() {
+ return new Offline(getHost());
+ }
+
+ /**
+ * Makes a query message.
+ *
+ * @return a new message
+ */
+ protected final Query makeQuery() {
+ return new Query(getHost());
+ }
+
+ public final BucketAssignments getAssignments() {
+ return mgr.getAssignments();
+ }
+
+ public final String getHost() {
+ return mgr.getHost();
+ }
+
+ public final String getTopic() {
+ return mgr.getTopic();
+ }
+
+ public final PoolingProperties getProperties() {
+ return mgr.getProperties();
+ }
+}