aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java')
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java647
1 files changed, 647 insertions, 0 deletions
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
new file mode 100644
index 00000000..7c0436eb
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -0,0 +1,647 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import com.google.gson.JsonParseException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.state.ActiveState;
+import org.onap.policy.drools.pooling.state.IdleState;
+import org.onap.policy.drools.pooling.state.InactiveState;
+import org.onap.policy.drools.pooling.state.QueryState;
+import org.onap.policy.drools.pooling.state.StartState;
+import org.onap.policy.drools.pooling.state.State;
+import org.onap.policy.drools.pooling.state.StateTimerTask;
+import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
+import org.onap.policy.drools.system.PolicyController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
+ * events coming from external topics are saved in a queue for later processing. Once
+ * assignments are made, the saved events are processed. In addition, while the controller
+ * is locked, events are still forwarded to other hosts and bucket assignments are still
+ * updated, based on any {@link Leader} messages that it receives.
+ */
+public class PoolingManagerImpl implements PoolingManager, TopicListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
+
+ /**
+ * Maximum number of times a message can be forwarded.
+ */
+ public static final int MAX_HOPS = 5;
+
+ /**
+ * ID of this host.
+ */
+ @Getter
+ private final String host;
+
+ /**
+ * Properties with which this was configured.
+ */
+ @Getter
+ private final PoolingProperties properties;
+
+ /**
+ * Associated controller.
+ */
+ private final PolicyController controller;
+
+ /**
+ * Decremented each time the manager enters the Active state. Used by junit tests.
+ */
+ private final CountDownLatch activeLatch;
+
+ /**
+ * Used to encode & decode request objects received from & sent to a rule engine.
+ */
+ private final Serializer serializer;
+
+ /**
+ * Internal DMaaP topic used by this controller.
+ */
+ @Getter
+ private final String topic;
+
+ /**
+ * Manager for the internal DMaaP topic.
+ */
+ private final TopicMessageManager topicMessageManager;
+
+ /**
+ * Lock used while updating {@link #current}. In general, public methods must use
+ * this, while private methods assume the lock is already held.
+ */
+ private final Object curLocker = new Object();
+
+ /**
+ * Current state.
+ *
+ * <p>This uses a finite state machine, wherein the state object contains all of the data
+ * relevant to that state. Each state object has a process() method, specific to each
+ * type of {@link Message} subclass. The method returns the next state object, or
+ * {@code null} if the state is to remain the same.
+ */
+ private State current;
+
+ /**
+ * Current bucket assignments or {@code null}.
+ */
+ @Getter
+ private BucketAssignments assignments = null;
+
+ /**
+ * Pool used to execute timers.
+ */
+ private ScheduledThreadPoolExecutor scheduler = null;
+
+ /**
+ * Constructs the manager, initializing all the data structures.
+ *
+ * @param host name/uuid of this host
+ * @param controller controller with which this is associated
+ * @param props feature properties specific to the controller
+ * @param activeLatch latch to be decremented each time the manager enters the Active
+ * state
+ */
+ public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+ this.host = host;
+ this.controller = controller;
+ this.properties = props;
+ this.activeLatch = activeLatch;
+
+ try {
+ this.serializer = new Serializer();
+ this.topic = props.getPoolingTopic();
+ this.topicMessageManager = makeTopicMessagesManager(props.getPoolingTopic());
+ this.current = new IdleState(this);
+
+ logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
+
+ } catch (ClassCastException e) {
+ logger.error("not a topic listener, controller {}", controller.getName());
+ throw new PoolingFeatureRtException(e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
+ throw new PoolingFeatureRtException(e);
+ }
+ }
+
+ /**
+ * Should only be used by junit tests.
+ *
+ * @return the current state
+ */
+ protected State getCurrent() {
+ synchronized (curLocker) {
+ return current;
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to start. Starts the publisher for the
+ * internal topic, and creates a thread pool for the timers.
+ */
+ public void beforeStart() {
+ synchronized (curLocker) {
+ if (scheduler == null) {
+ topicMessageManager.startPublisher();
+
+ logger.debug("make scheduler thread for topic {}", getTopic());
+ scheduler = makeScheduler();
+
+ /*
+ * Only a handful of timers at any moment, thus we can afford to take the
+ * time to remove them when they're cancelled.
+ */
+ scheduler.setRemoveOnCancelPolicy(true);
+ scheduler.setMaximumPoolSize(1);
+ scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ }
+ }
+ }
+
+ /**
+ * Indicates that the controller has successfully started. Starts the consumer for the
+ * internal topic, enters the {@link StartState}, and sets the filter for the initial
+ * state.
+ */
+ public void afterStart() {
+ synchronized (curLocker) {
+ if (current instanceof IdleState) {
+ topicMessageManager.startConsumer(this);
+ changeState(new StartState(this));
+ }
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
+ * and the current state.
+ */
+ public void beforeStop() {
+ ScheduledThreadPoolExecutor sched;
+
+ synchronized (curLocker) {
+ sched = scheduler;
+ scheduler = null;
+
+ if (!(current instanceof IdleState)) {
+ changeState(new IdleState(this));
+ topicMessageManager.stopConsumer(this);
+ publishAdmin(new Offline(getHost()));
+ }
+
+ assignments = null;
+ }
+
+ if (sched != null) {
+ logger.debug("stop scheduler for topic {}", getTopic());
+ sched.shutdownNow();
+ }
+ }
+
+ /**
+ * Indicates that the controller has stopped. Stops the publisher and logs a warning
+ * if any events are still in the queue.
+ */
+ public void afterStop() {
+ synchronized (curLocker) {
+ /*
+ * stop the publisher, but allow time for any Offline message to be
+ * transmitted
+ */
+ topicMessageManager.stopPublisher(properties.getOfflinePubWaitMs());
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to be locked. Enters the idle state, as all
+ * it will be doing is forwarding messages.
+ */
+ public void beforeLock() {
+ logger.info("locking manager for topic {}", getTopic());
+
+ synchronized (curLocker) {
+ changeState(new IdleState(this));
+ }
+ }
+
+ /**
+ * Indicates that the controller has been unlocked. Enters the start state, if the
+ * controller is running.
+ */
+ public void afterUnlock() {
+ logger.info("unlocking manager for topic {}", getTopic());
+
+ synchronized (curLocker) {
+ if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
+ changeState(new StartState(this));
+ }
+ }
+ }
+
+ /**
+ * Changes the finite state machine to a new state, provided the new state is not
+ * {@code null}.
+ *
+ * @param newState new state, or {@code null} if to remain unchanged
+ */
+ private void changeState(State newState) {
+ if (newState != null) {
+ current.cancelTimers();
+ current = newState;
+
+ newState.start();
+ }
+ }
+
+ @Override
+ public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
+ // wrap the task in a TimerAction and schedule it
+ ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
+
+ // wrap the future in a "CancellableScheduledTask"
+ return () -> fut.cancel(false);
+ }
+
+ @Override
+ public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ // wrap the task in a TimerAction and schedule it
+ ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
+ TimeUnit.MILLISECONDS);
+
+ // wrap the future in a "CancellableScheduledTask"
+ return () -> fut.cancel(false);
+ }
+
+ @Override
+ public void publishAdmin(Message msg) {
+ publish(Message.ADMIN, msg);
+ }
+
+ @Override
+ public void publish(String channel, Message msg) {
+ logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
+
+ msg.setChannel(channel);
+
+ try {
+ // ensure it's valid before we send it
+ msg.checkValidity();
+
+ String txt = serializer.encodeMsg(msg);
+ topicMessageManager.publish(txt);
+
+ } catch (JsonParseException e) {
+ logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
+ }
+ }
+
+ /**
+ * Handles an event from the internal topic.
+ *
+ * @param commType comm infrastructure
+ * @param topic2 topic
+ * @param event event
+ */
+ @Override
+ public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
+
+ if (event == null) {
+ logger.error("null event on topic {}", topic);
+ return;
+ }
+
+ synchronized (curLocker) {
+ // it's on the internal topic
+ handleInternal(event);
+ }
+ }
+
+ /**
+ * Called by the PolicyController before it offers the event to the DroolsController.
+ * If the controller is locked, then it isn't processing events. However, they still
+ * need to be forwarded, thus in that case, they are decoded and forwarded.
+ *
+ * <p>On the other hand, if the controller is not locked, then we just return immediately
+ * and let {@link #beforeInsert(String, Object) beforeInsert()} handle
+ * it instead, as it already has the decoded message.
+ *
+ * @param topic2 topic
+ * @param event event
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
+ */
+ public boolean beforeOffer(String topic2, String event) {
+
+ if (!controller.isLocked()) {
+ // we should NOT intercept this message - let the invoker handle it
+ return false;
+ }
+
+ return handleExternal(topic2, decodeEvent(topic2, event));
+ }
+
+ /**
+ * Called by the DroolsController before it inserts the event into the rule engine.
+ *
+ * @param topic2 topic
+ * @param event event, as an object
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
+ */
+ public boolean beforeInsert(String topic2, Object event) {
+ return handleExternal(topic2, event);
+ }
+
+ /**
+ * Handles an event from an external topic.
+ *
+ * @param topic2 topic
+ * @param event event, as an object, or {@code null} if it cannot be decoded
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
+ */
+ private boolean handleExternal(String topic2, Object event) {
+ if (event == null) {
+ // no event - let the invoker handle it
+ return false;
+ }
+
+ synchronized (curLocker) {
+ return handleExternal(topic2, event, event.hashCode());
+ }
+ }
+
+ /**
+ * Handles an event from an external topic.
+ *
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
+ * @return {@code true} if the event was handled, {@code false} if the invoker should
+ * handle it
+ */
+ private boolean handleExternal(String topic2, Object event, int eventHashCode) {
+ if (assignments == null) {
+ // no bucket assignments yet - handle locally
+ logger.info("handle event locally for request {}", event);
+
+ // we did NOT consume the event
+ return false;
+
+ } else {
+ return handleEvent(topic2, event, eventHashCode);
+ }
+ }
+
+ /**
+ * Handles a {@link Forward} event, possibly forwarding it again.
+ *
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
+ * @return {@code true} if the event was handled, {@code false} if the invoker should
+ * handle it
+ */
+ private boolean handleEvent(String topic2, Object event, int eventHashCode) {
+ String target = assignments.getAssignedHost(eventHashCode);
+
+ if (target == null) {
+ /*
+ * This bucket has no assignment - just discard the event
+ */
+ logger.warn("discarded event for unassigned bucket from topic {}", topic2);
+ return true;
+ }
+
+ if (target.equals(host)) {
+ /*
+ * Message belongs to this host - allow the controller to handle it.
+ */
+ logger.info("handle local event for request {} from topic {}", event, topic2);
+ return false;
+ }
+
+ // not our message, consume the event
+ logger.warn("discarded event for host {} from topic {}", target, topic2);
+ return true;
+ }
+
+ /**
+ * Decodes an event from a String into an event Object.
+ *
+ * @param topic2 topic
+ * @param event event
+ * @return the decoded event object, or {@code null} if it can't be decoded
+ */
+ private Object decodeEvent(String topic2, String event) {
+ DroolsController drools = controller.getDrools();
+
+ // check if this topic has a decoder
+
+ if (!canDecodeEvent(drools, topic2)) {
+
+ logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
+ drools.getArtifactId());
+ return null;
+ }
+
+ // decode
+
+ try {
+ return decodeEventWrapper(drools, topic2, event);
+
+ } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
+ logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
+ return null;
+ }
+ }
+
+ /**
+ * Handles an event from the internal topic. This uses reflection to identify the
+ * appropriate process() method to invoke, based on the type of Message that was
+ * decoded.
+ *
+ * @param event the serialized {@link Message} read from the internal topic
+ */
+ private void handleInternal(String event) {
+ Class<?> clazz = null;
+
+ try {
+ Message msg = serializer.decodeMsg(event);
+
+ // get the class BEFORE checking the validity
+ clazz = msg.getClass();
+
+ msg.checkValidity();
+
+ var meth = current.getClass().getMethod("process", msg.getClass());
+ changeState((State) meth.invoke(current, msg));
+
+ } catch (JsonParseException e) {
+ logger.warn("failed to decode message for topic {}", topic, e);
+
+ } catch (NoSuchMethodException | SecurityException e) {
+ logger.error("no processor for message {} for topic {}", clazz, topic, e);
+
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
+ | PoolingFeatureException e) {
+ logger.error("failed to process message {} for topic {}", clazz, topic, e);
+ }
+ }
+
+ @Override
+ public void startDistributing(BucketAssignments asgn) {
+ synchronized (curLocker) {
+ int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
+ logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
+ assignments = asgn;
+ }
+ }
+
+ @Override
+ public State goStart() {
+ return new StartState(this);
+ }
+
+ @Override
+ public State goQuery() {
+ return new QueryState(this);
+ }
+
+ @Override
+ public State goActive() {
+ activeLatch.countDown();
+ return new ActiveState(this);
+ }
+
+ @Override
+ public State goInactive() {
+ return new InactiveState(this);
+ }
+
+ /**
+ * Action to run a timer task. Only runs the task if the machine is still in the state
+ * that it was in when the timer was created.
+ */
+ private class TimerAction implements Runnable {
+
+ /**
+ * State of the machine when the timer was created.
+ */
+ private State origState;
+
+ /**
+ * Task to be executed.
+ */
+ private StateTimerTask task;
+
+ /**
+ * Constructor.
+ *
+ * @param task task to execute when this timer runs
+ */
+ public TimerAction(StateTimerTask task) {
+ this.origState = current;
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ synchronized (curLocker) {
+ if (current == origState) {
+ changeState(task.fire());
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a DMaaP manager.
+ *
+ * @param topic name of the internal DMaaP topic
+ * @return a new topic messages manager
+ * @throws PoolingFeatureException if an error occurs
+ */
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ return new TopicMessageManager(topic);
+ }
+
+ /**
+ * Creates a scheduled thread pool.
+ *
+ * @return a new scheduled thread pool
+ */
+ protected ScheduledThreadPoolExecutor makeScheduler() {
+ return new ScheduledThreadPoolExecutor(1);
+ }
+
+ /**
+ * Determines if the event can be decoded.
+ *
+ * @param drools drools controller
+ * @param topic topic on which the event was received
+ * @return {@code true} if the event can be decoded, {@code false} otherwise
+ */
+ protected boolean canDecodeEvent(DroolsController drools, String topic) {
+ return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(),
+ topic);
+ }
+
+ /**
+ * Decodes the event.
+ *
+ * @param drools drools controller
+ * @param topic topic on which the event was received
+ * @param event event text to be decoded
+ * @return the decoded event
+ * @throws IllegalArgumentException illegal argument
+ * @throws UnsupportedOperationException unsupported operation
+ * @throws IllegalStateException illegal state
+ */
+ protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
+ return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic,
+ event);
+ }
+}