summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java291
1 files changed, 171 insertions, 120 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index cd71670d..422efdd7 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.event.comm.TopicListener;
@@ -38,6 +39,7 @@ import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.state.ActiveState;
import org.onap.policy.drools.pooling.state.IdleState;
import org.onap.policy.drools.pooling.state.InactiveState;
@@ -52,39 +54,30 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
/**
- * Implementation of a {@link PoolingManager}. Until bucket assignments have
- * been made, events coming from external topics are saved in a queue for later
- * processing. Once assignments are made, the saved events are processed. In
- * addition, while the controller is locked, events are still forwarded to other
- * hosts and bucket assignments are still updated, based on any {@link Leader}
- * messages that it receives.
+ * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
+ * events coming from external topics are saved in a queue for later processing. Once
+ * assignments are made, the saved events are processed. In addition, while the controller
+ * is locked, events are still forwarded to other hosts and bucket assignments are still
+ * updated, based on any {@link Leader} messages that it receives.
*/
public class PoolingManagerImpl implements PoolingManager, TopicListener {
private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
- // TODO metrics, audit logging
-
/**
* Maximum number of times a message can be forwarded.
*/
public static final int MAX_HOPS = 5;
/**
- * Type of item that the extractors will be extracting.
- */
- private static final String EXTRACTOR_TYPE = "requestId";
-
- /**
- * Prefix for extractor properties.
+ * Factory used to create various objects. Can be overridden during junit testing.
*/
- private static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE;
+ private static Factory factory = new Factory();
/**
- * Factory used to create various objects. Can be overridden during junit
- * testing.
+ * ID of the last host that was created.
*/
- private static Factory factory = new Factory();
+ private static final AtomicReference<String> lastHost = new AtomicReference<>(null);
/**
* ID of this host.
@@ -102,14 +95,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final PolicyController controller;
/**
- * Where to offer events that have been forwarded to this host (i.e, the
- * controller).
+ * Where to offer events that have been forwarded to this host (i.e, the controller).
*/
private final TopicListener listener;
/**
- * Used to encode & decode request objects received from & sent to a rule
- * engine.
+ * Used to encode & decode request objects received from & sent to a rule engine.
*/
private final Serializer serializer;
@@ -129,19 +120,18 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final ClassExtractors extractors;
/**
- * Lock used while updating {@link #current}. In general, public methods
- * must use this, while private methods assume the lock is already held.
+ * Lock used while updating {@link #current}. In general, public methods must use
+ * this, while private methods assume the lock is already held.
*/
private final Object curLocker = new Object();
/**
* Current state.
* <p>
- * This uses a finite state machine, wherein the state object contains all
- * of the data relevant to that state. Each state object has a process()
- * method, specific to each type of {@link Message} subclass. The method
- * returns the next state object, or {@code null} if the state is to remain
- * the same.
+ * This uses a finite state machine, wherein the state object contains all of the data
+ * relevant to that state. Each state object has a process() method, specific to each
+ * type of {@link Message} subclass. The method returns the next state object, or
+ * {@code null} if the state is to remain the same.
*/
private State current;
@@ -177,13 +167,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.controller = controller;
this.props = props;
+ lastHost.set(this.host);
+
try {
this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
this.eventq = factory.makeEventQueue(props);
- SpecProperties spec = new SpecProperties(PROP_EXTRACTOR_PREFIX, controller.getName());
+ SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(),
+ props.getSource());
this.extractors = factory.makeClassExtractors(spec);
this.dmaapMgr = factory.makeDmaapManager(props);
@@ -197,7 +190,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
} catch (PoolingFeatureException e) {
logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
- throw e.toRuntimeException();
+ throw new PoolingFeatureRtException(e);
}
}
@@ -210,6 +203,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
+ * Used by junit tests.
+ *
+ * @return the ID of the last host that was created, or {@code null} if no hosts have
+ * been created yet
+ */
+ protected static String getLastHost() {
+ return lastHost.get();
+ }
+
+ /**
* Should only be used by junit tests.
*
* @return the current state
@@ -234,22 +237,22 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Indicates that the controller is about to start. Starts the publisher for
- * the internal topic, and creates a thread pool for the timers.
+ * Indicates that the controller is about to start. Starts the publisher for the
+ * internal topic, and creates a thread pool for the timers.
*
- * @throws PoolingFeatureException if the internal topic publisher cannot be
- * started
+ * @throws PoolingFeatureException if the internal topic publisher cannot be started
*/
public void beforeStart() throws PoolingFeatureException {
synchronized (curLocker) {
if (scheduler == null) {
dmaapMgr.startPublisher();
+ logger.debug("make scheduler thread for topic {}", getTopic());
scheduler = factory.makeScheduler();
/*
- * Only a handful of timers at any moment, thus we can afford to
- * take the time to remove them when they're cancelled.
+ * Only a handful of timers at any moment, thus we can afford to take the
+ * time to remove them when they're cancelled.
*/
scheduler.setRemoveOnCancelPolicy(true);
scheduler.setMaximumPoolSize(1);
@@ -260,9 +263,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Indicates that the controller has successfully started. Starts the
- * consumer for the internal topic, enters the {@link StartState}, and sets
- * the filter for the initial state.
+ * Indicates that the controller has successfully started. Starts the consumer for the
+ * internal topic, enters the {@link StartState}, and sets the filter for the initial
+ * state.
*/
public void afterStart() {
synchronized (curLocker) {
@@ -274,8 +277,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Indicates that the controller is about to stop. Stops the consumer, the
- * scheduler, and the current state.
+ * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
+ * and the current state.
*/
public void beforeStop() {
ScheduledThreadPoolExecutor sched;
@@ -287,23 +290,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
if (!(current instanceof IdleState)) {
dmaapMgr.stopConsumer(this);
changeState(new IdleState(this));
-
- // TODO
- /*
- * Need a brief delay here to allow "offline" message to be
- * transmitted?
- */
+ publishAdmin(new Offline(getHost()));
}
}
if (sched != null) {
+ logger.debug("stop scheduler for topic {}", getTopic());
sched.shutdownNow();
}
}
/**
- * Indicates that the controller has stopped. Stops the publisher and logs a
- * warning if any events are still in the queue.
+ * Indicates that the controller has stopped. Stops the publisher and logs a warning
+ * if any events are still in the queue.
*/
public void afterStop() {
synchronized (curLocker) {
@@ -312,25 +311,33 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
eventq.clear();
}
- dmaapMgr.stopPublisher();
+ /*
+ * stop the publisher, but allow time for any Offline message to be
+ * transmitted
+ */
+ dmaapMgr.stopPublisher(props.getOfflinePubWaitMs());
}
}
/**
- * Indicates that the controller is about to be locked. Enters the idle
- * state, as all it will be doing is forwarding messages.
+ * Indicates that the controller is about to be locked. Enters the idle state, as all
+ * it will be doing is forwarding messages.
*/
public void beforeLock() {
+ logger.info("locking manager for topic {}", getTopic());
+
synchronized (curLocker) {
changeState(new IdleState(this));
}
}
/**
- * Indicates that the controller has been unlocked. Enters the start state,
- * if the controller is running.
+ * Indicates that the controller has been unlocked. Enters the start state, if the
+ * controller is running.
*/
public void afterUnlock() {
+ logger.info("unlocking manager for topic {}", getTopic());
+
synchronized (curLocker) {
if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
changeState(new StartState(this));
@@ -339,8 +346,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Changes the finite state machine to a new state, provided the new state
- * is not {@code null}.
+ * Changes the finite state machine to a new state, provided the new state is not
+ * {@code null}.
*
* @param newState new state, or {@code null} if to remain unchanged
*/
@@ -379,30 +386,46 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
CountDownLatch latch = new CountDownLatch(1);
/*
- * We don't want to build up items in our queue if we can't forward them
- * to other hosts, so we just stop the controller.
+ * We don't want to build up items in our queue if we can't forward them to other
+ * hosts, so we just stop the controller.
*
* Use a background thread to prevent deadlocks.
*/
- new Thread() {
- @Override
- public void run() {
- controller.stop();
- latch.countDown();
- }
- }.start();
+ new Thread(() -> {
+ controller.stop();
+ latch.countDown();
+ }).start();
return latch;
}
@Override
- public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task) {
- return scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
+ public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
+ // wrap the task in a TimerAction and schedule it
+ ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
+
+ // wrap the future in a "CancellableScheduledTask"
+ return new CancellableScheduledTask() {
+ @Override
+ public void cancel() {
+ fut.cancel(false);
+ }
+ };
}
@Override
- public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
- return scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, TimeUnit.MILLISECONDS);
+ public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ // wrap the task in a TimerAction and schedule it
+ ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
+ TimeUnit.MILLISECONDS);
+
+ // wrap the future in a "CancellableScheduledTask"
+ return new CancellableScheduledTask() {
+ @Override
+ public void cancel() {
+ fut.cancel(false);
+ }
+ };
}
@Override
@@ -412,6 +435,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
@Override
public void publish(String channel, Message msg) {
+ logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
+
msg.setChannel(channel);
try {
@@ -434,8 +459,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*
* @param topic2
* @param event
- * @return {@code true} if the event was handled, {@code false} if the
- * controller should handle it
+ * @return {@code true} if the event was handled, {@code false} if the controller
+ * should handle it
*/
@Override
public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
@@ -452,20 +477,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Called by the PolicyController before it offers the event to the
- * DroolsController. If the controller is locked, then it isn't processing
- * events. However, they still need to be forwarded, thus in that case, they
- * are decoded and forwarded.
+ * Called by the PolicyController before it offers the event to the DroolsController.
+ * If the controller is locked, then it isn't processing events. However, they still
+ * need to be forwarded, thus in that case, they are decoded and forwarded.
* <p>
- * On the other hand, if the controller is not locked, then we just return
- * immediately and let {@link #beforeInsert(Object, String, String, Object)
- * beforeInsert()} handle it instead, as it already has the decoded message.
+ * On the other hand, if the controller is not locked, then we just return immediately
+ * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
+ * it instead, as it already has the decoded message.
*
* @param protocol
* @param topic2
* @param event
- * @return {@code true} if the event was handled by the manager,
- * {@code false} if it must still be handled by the invoker
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
*/
public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
@@ -478,15 +502,14 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Called by the DroolsController before it inserts the event into the rule
- * engine.
+ * Called by the DroolsController before it inserts the event into the rule engine.
*
* @param protocol
* @param topic2
* @param event original event text, as received from the Bus
* @param event2 event, as an object
- * @return {@code true} if the event was handled by the manager,
- * {@code false} if it must still be handled by the invoker
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
*/
public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
@@ -504,10 +527,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* @param protocol
* @param topic2
* @param event
- * @param reqid request id extracted from the event, or {@code null} if it
- * couldn't be extracted
- * @return {@code true} if the event was handled by the manager,
- * {@code false} if it must still be handled by the invoker
+ * @param reqid request id extracted from the event, or {@code null} if it couldn't be
+ * extracted
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
*/
private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
if (reqid == null) {
@@ -524,6 +547,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
Forward ev = makeForward(protocol, topic2, event, reqid);
if (ev == null) {
// invalid args - consume the message
+ logger.warn("constructed an invalid Forward message on topic {}", getTopic());
return true;
}
@@ -536,12 +560,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* Handles an event from an external topic.
*
* @param event
- * @return {@code true} if the event was handled, {@code false} if the
- * invoker should handle it
+ * @return {@code true} if the event was handled, {@code false} if the invoker should
+ * handle it
*/
private boolean handleExternal(Forward event) {
if (assignments == null) {
// no bucket assignments yet - add it to the queue
+ logger.info("queued event for request {}", event.getRequestId());
eventq.add(event);
// we've consumed the event
@@ -556,17 +581,17 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* Handles a {@link Forward} event, possibly forwarding it again.
*
* @param event
- * @return {@code true} if the event was handled, {@code false} if the
- * invoker should handle it
+ * @return {@code true} if the event was handled, {@code false} if the invoker should
+ * handle it
*/
private boolean handleEvent(Forward event) {
- int bucket = Math.abs(event.getRequestId().hashCode()) % assignments.size();
- String target = assignments.getAssignedHost(bucket);
+ String target = assignments.getAssignedHost(event.getRequestId().hashCode());
if (target == null) {
/*
* This bucket has no assignment - just discard the event
*/
+ logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
return true;
}
@@ -574,6 +599,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/*
* Message belongs to this host - allow the controller to handle it.
*/
+ logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
return false;
}
@@ -583,6 +609,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
topic);
} else {
+ logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
event.bumpNumHops();
publish(target, event);
}
@@ -678,16 +705,21 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* @param event
*/
private void inject(Forward event) {
- intercept = false;
- listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
+ logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
- intercept = true;
+ try {
+ intercept = false;
+ listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
+
+ } finally {
+ intercept = true;
+ }
}
/**
- * Handles an event from the internal topic. This uses reflection to
- * identify the appropriate process() method to invoke, based on the type of
- * Message that was decoded.
+ * Handles an event from the internal topic. This uses reflection to identify the
+ * appropriate process() method to invoke, based on the type of Message that was
+ * decoded.
*
* @param event the serialized {@link Message} read from the internal topic
*/
@@ -711,29 +743,48 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
} catch (NoSuchMethodException | SecurityException e) {
logger.error("no processor for message {} for topic {}", clazz, topic, e);
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- logger.error("failed to process message {} for topic {}", clazz, topic, e);
-
- } catch (PoolingFeatureException e) {
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
+ | PoolingFeatureException e) {
logger.error("failed to process message {} for topic {}", clazz, topic, e);
}
}
@Override
- public void startDistributing(BucketAssignments assignments) {
- if (assignments == null) {
- return;
+ public CountDownLatch startDistributing(BucketAssignments asgn) {
+ if (asgn == null) {
+ return null;
}
+ logger.info("new assignments for topic {}", getTopic());
+
synchronized (curLocker) {
- this.assignments = assignments;
+ assignments = asgn;
+ }
+
+ /*
+ * publish the events from the event queue, but do it in a background thread so
+ * that the state machine can enter its correct state BEFORE we start processing
+ * the events
+ */
+ CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(() -> {
+ synchronized (curLocker) {
+ if (assignments == null) {
+ return;
+ }
- // now that we have assignments, we can process the queue
- Forward ev;
- while ((ev = eventq.poll()) != null) {
- handle(ev);
+ // now that we have assignments, we can process the queue
+ Forward ev;
+ while ((ev = eventq.poll()) != null) {
+ handle(ev);
+ }
+
+ latch.countDown();
}
- }
+ }).start();
+
+ return latch;
}
@Override
@@ -762,8 +813,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Action to run a timer task. Only runs the task if the machine is still in
- * the state that it was in when the timer was created.
+ * Action to run a timer task. Only runs the task if the machine is still in the state
+ * that it was in when the timer was created.
*/
private class TimerAction implements Runnable {
@@ -790,7 +841,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
public void run() {
synchronized (curLocker) {
if (current == origState) {
- changeState(task.fire(null));
+ changeState(task.fire());
}
}
}
@@ -818,7 +869,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* @return a new set of extractors
*/
public ClassExtractors makeClassExtractors(Properties props) {
- return new ClassExtractors(props, PROP_EXTRACTOR_PREFIX, EXTRACTOR_TYPE);
+ return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
+ PoolingProperties.EXTRACTOR_TYPE);
}
/**
@@ -846,8 +898,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*
* @param drools drools controller
* @param topic topic on which the event was received
- * @return {@code true} if the event can be decoded, {@code false}
- * otherwise
+ * @return {@code true} if the event can be decoded, {@code false} otherwise
*/
public boolean canDecodeEvent(DroolsController drools, String topic) {
return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic);