aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java82
1 files changed, 5 insertions, 77 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 86cec4c3..68dfee14 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -145,11 +145,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private ScheduledThreadPoolExecutor scheduler = null;
/**
- * Queue used when no bucket assignments are available.
- */
- private final EventQueue eventq;
-
- /**
* {@code True} if events offered by the controller should be intercepted,
* {@code false} otherwise.
*/
@@ -175,7 +170,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.eventq = factory.makeEventQueue(props);
this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
@@ -307,11 +301,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*/
public void afterStop() {
synchronized (curLocker) {
- if (!eventq.isEmpty()) {
- logger.warn("discarded {} messages after stopping topic {}", eventq.size(), topic);
- eventq.clear();
- }
-
/*
* stop the publisher, but allow time for any Offline message to be
* transmitted
@@ -381,26 +370,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
@Override
- public CountDownLatch internalTopicFailed() {
- logger.error("communication failed for topic {}", topic);
-
- CountDownLatch latch = new CountDownLatch(1);
-
- /*
- * We don't want to build up items in our queue if we can't forward them to other
- * hosts, so we just stop the controller.
- *
- * Use a background thread to prevent deadlocks.
- */
- new Thread(() -> {
- controller.stop();
- latch.countDown();
- }).start();
-
- return latch;
- }
-
- @Override
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
// wrap the task in a TimerAction and schedule it
ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
@@ -556,12 +525,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*/
private boolean handleExternal(Forward event) {
if (assignments == null) {
- // no bucket assignments yet - add it to the queue
- logger.info("queued event for request {}", event.getRequestId());
- eventq.add(event);
+ // no bucket assignments yet - handle locally
+ logger.info("handle event locally for request {}", event.getRequestId());
- // we've consumed the event
- return true;
+ // we did NOT consume the event
+ return false;
} else {
return handleEvent(event);
@@ -741,42 +709,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
@Override
- public CountDownLatch startDistributing(BucketAssignments asgn) {
+ public void startDistributing(BucketAssignments asgn) {
synchronized (curLocker) {
int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
assignments = asgn;
}
-
- if (asgn == null) {
- return null;
- }
-
- /*
- * publish the events from the event queue, but do it in a background thread so
- * that the state machine can enter its correct state BEFORE we start processing
- * the events
- */
- CountDownLatch latch = new CountDownLatch(1);
-
- new Thread(() -> {
- synchronized (curLocker) {
- if (assignments == null) {
- latch.countDown();
- return;
- }
-
- // now that we have assignments, we can process the queue
- Forward ev;
- while ((ev = eventq.poll()) != null) {
- handle(ev);
- }
-
- latch.countDown();
- }
- }).start();
-
- return latch;
}
@Override
@@ -846,16 +784,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
public static class Factory {
/**
- * Creates an event queue.
- *
- * @param props properties used to configure the event queue
- * @return a new event queue
- */
- public EventQueue makeEventQueue(PoolingProperties props) {
- return new EventQueue(props.getOfflineLimit(), props.getOfflineAgeMs());
- }
-
- /**
* Creates object extractors.
*
* @param props properties used to configure the extractors