diff options
Diffstat (limited to 'feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java')
-rw-r--r-- | feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java | 373 |
1 files changed, 373 insertions, 0 deletions
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java new file mode 100644 index 00000000..e2cf9586 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -0,0 +1,373 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import java.util.LinkedList; +import java.util.List; +import org.onap.policy.drools.pooling.CancellableScheduledTask; +import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.PoolingProperties; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A state in the finite state machine. + * + * <p>A state may have several timers associated with it, which must be cancelled whenever + * the state is changed. Assumes that timers are not continuously added to the same state. + */ +public abstract class State { + + private static final Logger logger = LoggerFactory.getLogger(State.class); + + /** + * Host pool manager. + */ + private final PoolingManager mgr; + + /** + * Timers added by this state. + */ + private final List<CancellableScheduledTask> timers = new LinkedList<>(); + + /** + * Constructor. + * + * @param mgr pooling manager + */ + protected State(PoolingManager mgr) { + this.mgr = mgr; + } + + /** + * Cancels the timers added by this state. + */ + public final void cancelTimers() { + timers.forEach(CancellableScheduledTask::cancel); + } + + /** + * Starts the state. The default method simply logs a message and returns. + */ + public void start() { + logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic()); + } + + /** + * Transitions to the "start" state. + * + * @return the new state + */ + public final State goStart() { + return mgr.goStart(); + } + + /** + * Transitions to the "query" state. + * + * @return the new state + */ + public State goQuery() { + return mgr.goQuery(); + } + + /** + * Goes active with a new set of assignments. + * + * @param asgn new assignments + * @return the new state, either Active or Inactive, depending on whether or not this + * host has an assignment + */ + protected State goActive(BucketAssignments asgn) { + startDistributing(asgn); + + if (asgn != null && asgn.hasAssignment(getHost())) { + return mgr.goActive(); + + } else { + return goInactive(); + } + } + + /** + * Transitions to the "inactive" state. + * + * @return the new state + */ + protected State goInactive() { + return mgr.goInactive(); + } + + /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Heartbeat msg) { + logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Identification msg) { + logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** + * Processes a message. The default method copies the assignments and then returns + * {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Leader msg) { + if (isValid(msg)) { + logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic()); + startDistributing(msg.getAssignments()); + } + + return null; + } + + /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Offline msg) { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Query msg) { + logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** + * Determines if a message is valid and did not originate from this host. + * + * @param msg message to be validated + * @return {@code true} if the message is valid, {@code false} otherwise + */ + protected boolean isValid(Leader msg) { + BucketAssignments asgn = msg.getAssignments(); + if (asgn == null) { + logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic()); + return false; + } + + // ignore Leader messages from ourself + String source = msg.getSource(); + if (source == null || source.equals(getHost())) { + logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic()); + return false; + } + + // the new leader must equal the source + boolean result = source.equals(asgn.getLeader()); + + if (!result) { + logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic()); + } + + return result; + } + + /** + * Publishes a message. + * + * @param msg message to be published + */ + protected final void publish(Identification msg) { + mgr.publishAdmin(msg); + } + + /** + * Publishes a message. + * + * @param msg message to be published + */ + protected final void publish(Leader msg) { + mgr.publishAdmin(msg); + } + + /** + * Publishes a message. + * + * @param msg message to be published + */ + protected final void publish(Offline msg) { + mgr.publishAdmin(msg); + } + + /** + * Publishes a message. + * + * @param msg message to be published + */ + protected final void publish(Query msg) { + mgr.publishAdmin(msg); + } + + /** + * Publishes a message on the specified channel. + * + * @param channel channel + * @param msg message to be published + */ + protected final void publish(String channel, Heartbeat msg) { + mgr.publish(channel, msg); + } + + /** + * Starts distributing messages using the specified bucket assignments. + * + * @param assignments assignments + */ + protected final void startDistributing(BucketAssignments assignments) { + if (assignments != null) { + mgr.startDistributing(assignments); + } + } + + /** + * Schedules a timer to fire after a delay. + * + * @param delayMs delay in ms + * @param task task + */ + protected final void schedule(long delayMs, StateTimerTask task) { + timers.add(mgr.schedule(delayMs, task)); + } + + /** + * Schedules a timer to fire repeatedly. + * + * @param initialDelayMs initial delay ms + * @param delayMs delay ms + * @param task task + */ + protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task)); + } + + /** + * Indicates that we failed to see our own heartbeat; must be a problem with the + * internal topic. Assumes the problem is temporary and continues to use the current + * bucket assignments. + * + * @return a new {@link StartState} + */ + protected final State missedHeartbeat() { + publish(makeOffline()); + + return mgr.goStart(); + } + + /** + * Indicates that the internal topic failed; this should only be invoked from the + * StartState. Discards bucket assignments and begins processing everything locally. + * + * @return a new {@link InactiveState} + */ + protected final State internalTopicFailed() { + publish(makeOffline()); + mgr.startDistributing(null); + + return mgr.goInactive(); + } + + /** + * Makes a heart beat message. + * + * @param timestampMs time, in milliseconds, associated with the message + * + * @return a new message + */ + protected final Heartbeat makeHeartbeat(long timestampMs) { + return new Heartbeat(getHost(), timestampMs); + } + + /** + * Makes an Identification message. + * + * @return a new message + */ + protected Identification makeIdentification() { + return new Identification(getHost(), getAssignments()); + } + + /** + * Makes an "offline" message. + * + * @return a new message + */ + protected final Offline makeOffline() { + return new Offline(getHost()); + } + + /** + * Makes a query message. + * + * @return a new message + */ + protected final Query makeQuery() { + return new Query(getHost()); + } + + public final BucketAssignments getAssignments() { + return mgr.getAssignments(); + } + + public final String getHost() { + return mgr.getHost(); + } + + public final String getTopic() { + return mgr.getTopic(); + } + + public final PoolingProperties getProperties() { + return mgr.getProperties(); + } +} |