diff options
Diffstat (limited to 'feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java')
-rw-r--r-- | feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java | 647 |
1 files changed, 647 insertions, 0 deletions
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java new file mode 100644 index 00000000..7c0436eb --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -0,0 +1,647 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import com.google.gson.JsonParseException; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.state.ActiveState; +import org.onap.policy.drools.pooling.state.IdleState; +import org.onap.policy.drools.pooling.state.InactiveState; +import org.onap.policy.drools.pooling.state.QueryState; +import org.onap.policy.drools.pooling.state.StartState; +import org.onap.policy.drools.pooling.state.State; +import org.onap.policy.drools.pooling.state.StateTimerTask; +import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants; +import org.onap.policy.drools.system.PolicyController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of a {@link PoolingManager}. Until bucket assignments have been made, + * events coming from external topics are saved in a queue for later processing. Once + * assignments are made, the saved events are processed. In addition, while the controller + * is locked, events are still forwarded to other hosts and bucket assignments are still + * updated, based on any {@link Leader} messages that it receives. + */ +public class PoolingManagerImpl implements PoolingManager, TopicListener { + + private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class); + + /** + * Maximum number of times a message can be forwarded. + */ + public static final int MAX_HOPS = 5; + + /** + * ID of this host. + */ + @Getter + private final String host; + + /** + * Properties with which this was configured. + */ + @Getter + private final PoolingProperties properties; + + /** + * Associated controller. + */ + private final PolicyController controller; + + /** + * Decremented each time the manager enters the Active state. Used by junit tests. + */ + private final CountDownLatch activeLatch; + + /** + * Used to encode & decode request objects received from & sent to a rule engine. + */ + private final Serializer serializer; + + /** + * Internal DMaaP topic used by this controller. + */ + @Getter + private final String topic; + + /** + * Manager for the internal DMaaP topic. + */ + private final TopicMessageManager topicMessageManager; + + /** + * Lock used while updating {@link #current}. In general, public methods must use + * this, while private methods assume the lock is already held. + */ + private final Object curLocker = new Object(); + + /** + * Current state. + * + * <p>This uses a finite state machine, wherein the state object contains all of the data + * relevant to that state. Each state object has a process() method, specific to each + * type of {@link Message} subclass. The method returns the next state object, or + * {@code null} if the state is to remain the same. + */ + private State current; + + /** + * Current bucket assignments or {@code null}. + */ + @Getter + private BucketAssignments assignments = null; + + /** + * Pool used to execute timers. + */ + private ScheduledThreadPoolExecutor scheduler = null; + + /** + * Constructs the manager, initializing all the data structures. + * + * @param host name/uuid of this host + * @param controller controller with which this is associated + * @param props feature properties specific to the controller + * @param activeLatch latch to be decremented each time the manager enters the Active + * state + */ + public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { + this.host = host; + this.controller = controller; + this.properties = props; + this.activeLatch = activeLatch; + + try { + this.serializer = new Serializer(); + this.topic = props.getPoolingTopic(); + this.topicMessageManager = makeTopicMessagesManager(props.getPoolingTopic()); + this.current = new IdleState(this); + + logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic); + + } catch (ClassCastException e) { + logger.error("not a topic listener, controller {}", controller.getName()); + throw new PoolingFeatureRtException(e); + + } catch (PoolingFeatureException e) { + logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName()); + throw new PoolingFeatureRtException(e); + } + } + + /** + * Should only be used by junit tests. + * + * @return the current state + */ + protected State getCurrent() { + synchronized (curLocker) { + return current; + } + } + + /** + * Indicates that the controller is about to start. Starts the publisher for the + * internal topic, and creates a thread pool for the timers. + */ + public void beforeStart() { + synchronized (curLocker) { + if (scheduler == null) { + topicMessageManager.startPublisher(); + + logger.debug("make scheduler thread for topic {}", getTopic()); + scheduler = makeScheduler(); + + /* + * Only a handful of timers at any moment, thus we can afford to take the + * time to remove them when they're cancelled. + */ + scheduler.setRemoveOnCancelPolicy(true); + scheduler.setMaximumPoolSize(1); + scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + } + } + } + + /** + * Indicates that the controller has successfully started. Starts the consumer for the + * internal topic, enters the {@link StartState}, and sets the filter for the initial + * state. + */ + public void afterStart() { + synchronized (curLocker) { + if (current instanceof IdleState) { + topicMessageManager.startConsumer(this); + changeState(new StartState(this)); + } + } + } + + /** + * Indicates that the controller is about to stop. Stops the consumer, the scheduler, + * and the current state. + */ + public void beforeStop() { + ScheduledThreadPoolExecutor sched; + + synchronized (curLocker) { + sched = scheduler; + scheduler = null; + + if (!(current instanceof IdleState)) { + changeState(new IdleState(this)); + topicMessageManager.stopConsumer(this); + publishAdmin(new Offline(getHost())); + } + + assignments = null; + } + + if (sched != null) { + logger.debug("stop scheduler for topic {}", getTopic()); + sched.shutdownNow(); + } + } + + /** + * Indicates that the controller has stopped. Stops the publisher and logs a warning + * if any events are still in the queue. + */ + public void afterStop() { + synchronized (curLocker) { + /* + * stop the publisher, but allow time for any Offline message to be + * transmitted + */ + topicMessageManager.stopPublisher(properties.getOfflinePubWaitMs()); + } + } + + /** + * Indicates that the controller is about to be locked. Enters the idle state, as all + * it will be doing is forwarding messages. + */ + public void beforeLock() { + logger.info("locking manager for topic {}", getTopic()); + + synchronized (curLocker) { + changeState(new IdleState(this)); + } + } + + /** + * Indicates that the controller has been unlocked. Enters the start state, if the + * controller is running. + */ + public void afterUnlock() { + logger.info("unlocking manager for topic {}", getTopic()); + + synchronized (curLocker) { + if (controller.isAlive() && current instanceof IdleState && scheduler != null) { + changeState(new StartState(this)); + } + } + } + + /** + * Changes the finite state machine to a new state, provided the new state is not + * {@code null}. + * + * @param newState new state, or {@code null} if to remain unchanged + */ + private void changeState(State newState) { + if (newState != null) { + current.cancelTimers(); + current = newState; + + newState.start(); + } + } + + @Override + public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return () -> fut.cancel(false); + } + + @Override + public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, + TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return () -> fut.cancel(false); + } + + @Override + public void publishAdmin(Message msg) { + publish(Message.ADMIN, msg); + } + + @Override + public void publish(String channel, Message msg) { + logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic()); + + msg.setChannel(channel); + + try { + // ensure it's valid before we send it + msg.checkValidity(); + + String txt = serializer.encodeMsg(msg); + topicMessageManager.publish(txt); + + } catch (JsonParseException e) { + logger.error("failed to serialize message for topic {} channel {}", topic, channel, e); + + } catch (PoolingFeatureException e) { + logger.error("failed to publish message for topic {} channel {}", topic, channel, e); + } + } + + /** + * Handles an event from the internal topic. + * + * @param commType comm infrastructure + * @param topic2 topic + * @param event event + */ + @Override + public void onTopicEvent(CommInfrastructure commType, String topic2, String event) { + + if (event == null) { + logger.error("null event on topic {}", topic); + return; + } + + synchronized (curLocker) { + // it's on the internal topic + handleInternal(event); + } + } + + /** + * Called by the PolicyController before it offers the event to the DroolsController. + * If the controller is locked, then it isn't processing events. However, they still + * need to be forwarded, thus in that case, they are decoded and forwarded. + * + * <p>On the other hand, if the controller is not locked, then we just return immediately + * and let {@link #beforeInsert(String, Object) beforeInsert()} handle + * it instead, as it already has the decoded message. + * + * @param topic2 topic + * @param event event + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker + */ + public boolean beforeOffer(String topic2, String event) { + + if (!controller.isLocked()) { + // we should NOT intercept this message - let the invoker handle it + return false; + } + + return handleExternal(topic2, decodeEvent(topic2, event)); + } + + /** + * Called by the DroolsController before it inserts the event into the rule engine. + * + * @param topic2 topic + * @param event event, as an object + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker + */ + public boolean beforeInsert(String topic2, Object event) { + return handleExternal(topic2, event); + } + + /** + * Handles an event from an external topic. + * + * @param topic2 topic + * @param event event, as an object, or {@code null} if it cannot be decoded + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker + */ + private boolean handleExternal(String topic2, Object event) { + if (event == null) { + // no event - let the invoker handle it + return false; + } + + synchronized (curLocker) { + return handleExternal(topic2, event, event.hashCode()); + } + } + + /** + * Handles an event from an external topic. + * + * @param topic2 topic + * @param event event, as an object + * @param eventHashCode event's hash code + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it + */ + private boolean handleExternal(String topic2, Object event, int eventHashCode) { + if (assignments == null) { + // no bucket assignments yet - handle locally + logger.info("handle event locally for request {}", event); + + // we did NOT consume the event + return false; + + } else { + return handleEvent(topic2, event, eventHashCode); + } + } + + /** + * Handles a {@link Forward} event, possibly forwarding it again. + * + * @param topic2 topic + * @param event event, as an object + * @param eventHashCode event's hash code + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it + */ + private boolean handleEvent(String topic2, Object event, int eventHashCode) { + String target = assignments.getAssignedHost(eventHashCode); + + if (target == null) { + /* + * This bucket has no assignment - just discard the event + */ + logger.warn("discarded event for unassigned bucket from topic {}", topic2); + return true; + } + + if (target.equals(host)) { + /* + * Message belongs to this host - allow the controller to handle it. + */ + logger.info("handle local event for request {} from topic {}", event, topic2); + return false; + } + + // not our message, consume the event + logger.warn("discarded event for host {} from topic {}", target, topic2); + return true; + } + + /** + * Decodes an event from a String into an event Object. + * + * @param topic2 topic + * @param event event + * @return the decoded event object, or {@code null} if it can't be decoded + */ + private Object decodeEvent(String topic2, String event) { + DroolsController drools = controller.getDrools(); + + // check if this topic has a decoder + + if (!canDecodeEvent(drools, topic2)) { + + logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(), + drools.getArtifactId()); + return null; + } + + // decode + + try { + return decodeEventWrapper(drools, topic2, event); + + } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) { + logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e); + return null; + } + } + + /** + * Handles an event from the internal topic. This uses reflection to identify the + * appropriate process() method to invoke, based on the type of Message that was + * decoded. + * + * @param event the serialized {@link Message} read from the internal topic + */ + private void handleInternal(String event) { + Class<?> clazz = null; + + try { + Message msg = serializer.decodeMsg(event); + + // get the class BEFORE checking the validity + clazz = msg.getClass(); + + msg.checkValidity(); + + var meth = current.getClass().getMethod("process", msg.getClass()); + changeState((State) meth.invoke(current, msg)); + + } catch (JsonParseException e) { + logger.warn("failed to decode message for topic {}", topic, e); + + } catch (NoSuchMethodException | SecurityException e) { + logger.error("no processor for message {} for topic {}", clazz, topic, e); + + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException + | PoolingFeatureException e) { + logger.error("failed to process message {} for topic {}", clazz, topic, e); + } + } + + @Override + public void startDistributing(BucketAssignments asgn) { + synchronized (curLocker) { + int sz = (asgn == null ? 0 : asgn.getAllHosts().size()); + logger.info("new assignments for {} hosts on topic {}", sz, getTopic()); + assignments = asgn; + } + } + + @Override + public State goStart() { + return new StartState(this); + } + + @Override + public State goQuery() { + return new QueryState(this); + } + + @Override + public State goActive() { + activeLatch.countDown(); + return new ActiveState(this); + } + + @Override + public State goInactive() { + return new InactiveState(this); + } + + /** + * Action to run a timer task. Only runs the task if the machine is still in the state + * that it was in when the timer was created. + */ + private class TimerAction implements Runnable { + + /** + * State of the machine when the timer was created. + */ + private State origState; + + /** + * Task to be executed. + */ + private StateTimerTask task; + + /** + * Constructor. + * + * @param task task to execute when this timer runs + */ + public TimerAction(StateTimerTask task) { + this.origState = current; + this.task = task; + } + + @Override + public void run() { + synchronized (curLocker) { + if (current == origState) { + changeState(task.fire()); + } + } + } + } + + /** + * Creates a DMaaP manager. + * + * @param topic name of the internal DMaaP topic + * @return a new topic messages manager + * @throws PoolingFeatureException if an error occurs + */ + protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException { + return new TopicMessageManager(topic); + } + + /** + * Creates a scheduled thread pool. + * + * @return a new scheduled thread pool + */ + protected ScheduledThreadPoolExecutor makeScheduler() { + return new ScheduledThreadPoolExecutor(1); + } + + /** + * Determines if the event can be decoded. + * + * @param drools drools controller + * @param topic topic on which the event was received + * @return {@code true} if the event can be decoded, {@code false} otherwise + */ + protected boolean canDecodeEvent(DroolsController drools, String topic) { + return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), + topic); + } + + /** + * Decodes the event. + * + * @param drools drools controller + * @param topic topic on which the event was received + * @param event event text to be decoded + * @return the decoded event + * @throws IllegalArgumentException illegal argument + * @throws UnsupportedOperationException unsupported operation + * @throws IllegalStateException illegal state + */ + protected Object decodeEventWrapper(DroolsController drools, String topic, String event) { + return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic, + event); + } +} |