diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java | 291 |
1 files changed, 171 insertions, 120 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index cd71670d..422efdd7 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.event.comm.TopicListener; @@ -38,6 +39,7 @@ import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.state.ActiveState; import org.onap.policy.drools.pooling.state.IdleState; import org.onap.policy.drools.pooling.state.InactiveState; @@ -52,39 +54,30 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; /** - * Implementation of a {@link PoolingManager}. Until bucket assignments have - * been made, events coming from external topics are saved in a queue for later - * processing. Once assignments are made, the saved events are processed. In - * addition, while the controller is locked, events are still forwarded to other - * hosts and bucket assignments are still updated, based on any {@link Leader} - * messages that it receives. + * Implementation of a {@link PoolingManager}. Until bucket assignments have been made, + * events coming from external topics are saved in a queue for later processing. Once + * assignments are made, the saved events are processed. In addition, while the controller + * is locked, events are still forwarded to other hosts and bucket assignments are still + * updated, based on any {@link Leader} messages that it receives. */ public class PoolingManagerImpl implements PoolingManager, TopicListener { private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class); - // TODO metrics, audit logging - /** * Maximum number of times a message can be forwarded. */ public static final int MAX_HOPS = 5; /** - * Type of item that the extractors will be extracting. - */ - private static final String EXTRACTOR_TYPE = "requestId"; - - /** - * Prefix for extractor properties. + * Factory used to create various objects. Can be overridden during junit testing. */ - private static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE; + private static Factory factory = new Factory(); /** - * Factory used to create various objects. Can be overridden during junit - * testing. + * ID of the last host that was created. */ - private static Factory factory = new Factory(); + private static final AtomicReference<String> lastHost = new AtomicReference<>(null); /** * ID of this host. @@ -102,14 +95,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final PolicyController controller; /** - * Where to offer events that have been forwarded to this host (i.e, the - * controller). + * Where to offer events that have been forwarded to this host (i.e, the controller). */ private final TopicListener listener; /** - * Used to encode & decode request objects received from & sent to a rule - * engine. + * Used to encode & decode request objects received from & sent to a rule engine. */ private final Serializer serializer; @@ -129,19 +120,18 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final ClassExtractors extractors; /** - * Lock used while updating {@link #current}. In general, public methods - * must use this, while private methods assume the lock is already held. + * Lock used while updating {@link #current}. In general, public methods must use + * this, while private methods assume the lock is already held. */ private final Object curLocker = new Object(); /** * Current state. * <p> - * This uses a finite state machine, wherein the state object contains all - * of the data relevant to that state. Each state object has a process() - * method, specific to each type of {@link Message} subclass. The method - * returns the next state object, or {@code null} if the state is to remain - * the same. + * This uses a finite state machine, wherein the state object contains all of the data + * relevant to that state. Each state object has a process() method, specific to each + * type of {@link Message} subclass. The method returns the next state object, or + * {@code null} if the state is to remain the same. */ private State current; @@ -177,13 +167,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { this.controller = controller; this.props = props; + lastHost.set(this.host); + try { this.listener = (TopicListener) controller; this.serializer = new Serializer(); this.topic = props.getPoolingTopic(); this.eventq = factory.makeEventQueue(props); - SpecProperties spec = new SpecProperties(PROP_EXTRACTOR_PREFIX, controller.getName()); + SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), + props.getSource()); this.extractors = factory.makeClassExtractors(spec); this.dmaapMgr = factory.makeDmaapManager(props); @@ -197,7 +190,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } catch (PoolingFeatureException e) { logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName()); - throw e.toRuntimeException(); + throw new PoolingFeatureRtException(e); } } @@ -210,6 +203,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** + * Used by junit tests. + * + * @return the ID of the last host that was created, or {@code null} if no hosts have + * been created yet + */ + protected static String getLastHost() { + return lastHost.get(); + } + + /** * Should only be used by junit tests. * * @return the current state @@ -234,22 +237,22 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Indicates that the controller is about to start. Starts the publisher for - * the internal topic, and creates a thread pool for the timers. + * Indicates that the controller is about to start. Starts the publisher for the + * internal topic, and creates a thread pool for the timers. * - * @throws PoolingFeatureException if the internal topic publisher cannot be - * started + * @throws PoolingFeatureException if the internal topic publisher cannot be started */ public void beforeStart() throws PoolingFeatureException { synchronized (curLocker) { if (scheduler == null) { dmaapMgr.startPublisher(); + logger.debug("make scheduler thread for topic {}", getTopic()); scheduler = factory.makeScheduler(); /* - * Only a handful of timers at any moment, thus we can afford to - * take the time to remove them when they're cancelled. + * Only a handful of timers at any moment, thus we can afford to take the + * time to remove them when they're cancelled. */ scheduler.setRemoveOnCancelPolicy(true); scheduler.setMaximumPoolSize(1); @@ -260,9 +263,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Indicates that the controller has successfully started. Starts the - * consumer for the internal topic, enters the {@link StartState}, and sets - * the filter for the initial state. + * Indicates that the controller has successfully started. Starts the consumer for the + * internal topic, enters the {@link StartState}, and sets the filter for the initial + * state. */ public void afterStart() { synchronized (curLocker) { @@ -274,8 +277,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Indicates that the controller is about to stop. Stops the consumer, the - * scheduler, and the current state. + * Indicates that the controller is about to stop. Stops the consumer, the scheduler, + * and the current state. */ public void beforeStop() { ScheduledThreadPoolExecutor sched; @@ -287,23 +290,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { if (!(current instanceof IdleState)) { dmaapMgr.stopConsumer(this); changeState(new IdleState(this)); - - // TODO - /* - * Need a brief delay here to allow "offline" message to be - * transmitted? - */ + publishAdmin(new Offline(getHost())); } } if (sched != null) { + logger.debug("stop scheduler for topic {}", getTopic()); sched.shutdownNow(); } } /** - * Indicates that the controller has stopped. Stops the publisher and logs a - * warning if any events are still in the queue. + * Indicates that the controller has stopped. Stops the publisher and logs a warning + * if any events are still in the queue. */ public void afterStop() { synchronized (curLocker) { @@ -312,25 +311,33 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { eventq.clear(); } - dmaapMgr.stopPublisher(); + /* + * stop the publisher, but allow time for any Offline message to be + * transmitted + */ + dmaapMgr.stopPublisher(props.getOfflinePubWaitMs()); } } /** - * Indicates that the controller is about to be locked. Enters the idle - * state, as all it will be doing is forwarding messages. + * Indicates that the controller is about to be locked. Enters the idle state, as all + * it will be doing is forwarding messages. */ public void beforeLock() { + logger.info("locking manager for topic {}", getTopic()); + synchronized (curLocker) { changeState(new IdleState(this)); } } /** - * Indicates that the controller has been unlocked. Enters the start state, - * if the controller is running. + * Indicates that the controller has been unlocked. Enters the start state, if the + * controller is running. */ public void afterUnlock() { + logger.info("unlocking manager for topic {}", getTopic()); + synchronized (curLocker) { if (controller.isAlive() && current instanceof IdleState && scheduler != null) { changeState(new StartState(this)); @@ -339,8 +346,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Changes the finite state machine to a new state, provided the new state - * is not {@code null}. + * Changes the finite state machine to a new state, provided the new state is not + * {@code null}. * * @param newState new state, or {@code null} if to remain unchanged */ @@ -379,30 +386,46 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { CountDownLatch latch = new CountDownLatch(1); /* - * We don't want to build up items in our queue if we can't forward them - * to other hosts, so we just stop the controller. + * We don't want to build up items in our queue if we can't forward them to other + * hosts, so we just stop the controller. * * Use a background thread to prevent deadlocks. */ - new Thread() { - @Override - public void run() { - controller.stop(); - latch.countDown(); - } - }.start(); + new Thread(() -> { + controller.stop(); + latch.countDown(); + }).start(); return latch; } @Override - public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task) { - return scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); + public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return new CancellableScheduledTask() { + @Override + public void cancel() { + fut.cancel(false); + } + }; } @Override - public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { - return scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, TimeUnit.MILLISECONDS); + public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, + TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return new CancellableScheduledTask() { + @Override + public void cancel() { + fut.cancel(false); + } + }; } @Override @@ -412,6 +435,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { @Override public void publish(String channel, Message msg) { + logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic()); + msg.setChannel(channel); try { @@ -434,8 +459,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * * @param topic2 * @param event - * @return {@code true} if the event was handled, {@code false} if the - * controller should handle it + * @return {@code true} if the event was handled, {@code false} if the controller + * should handle it */ @Override public void onTopicEvent(CommInfrastructure commType, String topic2, String event) { @@ -452,20 +477,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Called by the PolicyController before it offers the event to the - * DroolsController. If the controller is locked, then it isn't processing - * events. However, they still need to be forwarded, thus in that case, they - * are decoded and forwarded. + * Called by the PolicyController before it offers the event to the DroolsController. + * If the controller is locked, then it isn't processing events. However, they still + * need to be forwarded, thus in that case, they are decoded and forwarded. * <p> - * On the other hand, if the controller is not locked, then we just return - * immediately and let {@link #beforeInsert(Object, String, String, Object) - * beforeInsert()} handle it instead, as it already has the decoded message. + * On the other hand, if the controller is not locked, then we just return immediately + * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle + * it instead, as it already has the decoded message. * * @param protocol * @param topic2 * @param event - * @return {@code true} if the event was handled by the manager, - * {@code false} if it must still be handled by the invoker + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker */ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { @@ -478,15 +502,14 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Called by the DroolsController before it inserts the event into the rule - * engine. + * Called by the DroolsController before it inserts the event into the rule engine. * * @param protocol * @param topic2 * @param event original event text, as received from the Bus * @param event2 event, as an object - * @return {@code true} if the event was handled by the manager, - * {@code false} if it must still be handled by the invoker + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker */ public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) { @@ -504,10 +527,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param protocol * @param topic2 * @param event - * @param reqid request id extracted from the event, or {@code null} if it - * couldn't be extracted - * @return {@code true} if the event was handled by the manager, - * {@code false} if it must still be handled by the invoker + * @param reqid request id extracted from the event, or {@code null} if it couldn't be + * extracted + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker */ private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) { if (reqid == null) { @@ -524,6 +547,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { Forward ev = makeForward(protocol, topic2, event, reqid); if (ev == null) { // invalid args - consume the message + logger.warn("constructed an invalid Forward message on topic {}", getTopic()); return true; } @@ -536,12 +560,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Handles an event from an external topic. * * @param event - * @return {@code true} if the event was handled, {@code false} if the - * invoker should handle it + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it */ private boolean handleExternal(Forward event) { if (assignments == null) { // no bucket assignments yet - add it to the queue + logger.info("queued event for request {}", event.getRequestId()); eventq.add(event); // we've consumed the event @@ -556,17 +581,17 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Handles a {@link Forward} event, possibly forwarding it again. * * @param event - * @return {@code true} if the event was handled, {@code false} if the - * invoker should handle it + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it */ private boolean handleEvent(Forward event) { - int bucket = Math.abs(event.getRequestId().hashCode()) % assignments.size(); - String target = assignments.getAssignedHost(bucket); + String target = assignments.getAssignedHost(event.getRequestId().hashCode()); if (target == null) { /* * This bucket has no assignment - just discard the event */ + logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic()); return true; } @@ -574,6 +599,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /* * Message belongs to this host - allow the controller to handle it. */ + logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic()); return false; } @@ -583,6 +609,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { topic); } else { + logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); event.bumpNumHops(); publish(target, event); } @@ -678,16 +705,21 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param event */ private void inject(Forward event) { - intercept = false; - listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload()); + logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic()); - intercept = true; + try { + intercept = false; + listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload()); + + } finally { + intercept = true; + } } /** - * Handles an event from the internal topic. This uses reflection to - * identify the appropriate process() method to invoke, based on the type of - * Message that was decoded. + * Handles an event from the internal topic. This uses reflection to identify the + * appropriate process() method to invoke, based on the type of Message that was + * decoded. * * @param event the serialized {@link Message} read from the internal topic */ @@ -711,29 +743,48 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } catch (NoSuchMethodException | SecurityException e) { logger.error("no processor for message {} for topic {}", clazz, topic, e); - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - logger.error("failed to process message {} for topic {}", clazz, topic, e); - - } catch (PoolingFeatureException e) { + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException + | PoolingFeatureException e) { logger.error("failed to process message {} for topic {}", clazz, topic, e); } } @Override - public void startDistributing(BucketAssignments assignments) { - if (assignments == null) { - return; + public CountDownLatch startDistributing(BucketAssignments asgn) { + if (asgn == null) { + return null; } + logger.info("new assignments for topic {}", getTopic()); + synchronized (curLocker) { - this.assignments = assignments; + assignments = asgn; + } + + /* + * publish the events from the event queue, but do it in a background thread so + * that the state machine can enter its correct state BEFORE we start processing + * the events + */ + CountDownLatch latch = new CountDownLatch(1); + + new Thread(() -> { + synchronized (curLocker) { + if (assignments == null) { + return; + } - // now that we have assignments, we can process the queue - Forward ev; - while ((ev = eventq.poll()) != null) { - handle(ev); + // now that we have assignments, we can process the queue + Forward ev; + while ((ev = eventq.poll()) != null) { + handle(ev); + } + + latch.countDown(); } - } + }).start(); + + return latch; } @Override @@ -762,8 +813,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Action to run a timer task. Only runs the task if the machine is still in - * the state that it was in when the timer was created. + * Action to run a timer task. Only runs the task if the machine is still in the state + * that it was in when the timer was created. */ private class TimerAction implements Runnable { @@ -790,7 +841,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { public void run() { synchronized (curLocker) { if (current == origState) { - changeState(task.fire(null)); + changeState(task.fire()); } } } @@ -818,7 +869,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @return a new set of extractors */ public ClassExtractors makeClassExtractors(Properties props) { - return new ClassExtractors(props, PROP_EXTRACTOR_PREFIX, EXTRACTOR_TYPE); + return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX, + PoolingProperties.EXTRACTOR_TYPE); } /** @@ -846,8 +898,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * * @param drools drools controller * @param topic topic on which the event was received - * @return {@code true} if the event can be decoded, {@code false} - * otherwise + * @return {@code true} if the event can be decoded, {@code false} otherwise */ public boolean canDecodeEvent(DroolsController drools, String topic) { return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic); |