diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java')
4 files changed, 14 insertions, 218 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java deleted file mode 100644 index 0bed85b5..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling; - -import java.util.Deque; -import java.util.LinkedList; -import org.onap.policy.drools.pooling.message.Forward; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Finite queue of events waiting to be processed once the buckets have been - * assigned. - */ -public class EventQueue { - - private static final Logger logger = LoggerFactory.getLogger(EventQueue.class); - - /** - * Maximum number of events allowed in the queue. When excess events are - * added, the older events are removed. - */ - private int maxEvents; - - /** - * Maximum age, in milliseconds, of events in the queue. Events that are - * older than this are discarded rather than being handed off when - * {@link #poll()} is invoked. - */ - private long maxAgeMs; - - /** - * The actual queue of events. - */ - private Deque<Forward> events = new LinkedList<>(); - - /** - * - * @param maxEvents maximum number of events to hold in the queue - * @param maxAgeMs maximum age of events in the queue - */ - public EventQueue(int maxEvents, long maxAgeMs) { - this.maxEvents = maxEvents; - this.maxAgeMs = maxAgeMs; - } - - /** - * - * @return {@code true} if the queue is empty, {@code false} otherwise - */ - public boolean isEmpty() { - return events.isEmpty(); - } - - /** - * Clears the queue. - */ - public void clear() { - events.clear(); - } - - /** - * - * @return the number of elements in the queue - */ - public int size() { - return events.size(); - } - - /** - * Adds an item to the queue. If the queue is full, the older item is - * removed and discarded. - * - * @param event - */ - public void add(Forward event) { - if (events.size() >= maxEvents) { - logger.warn("full queue - discarded event for topic {}", event.getTopic()); - events.remove(); - } - - events.add(event); - } - - /** - * Gets the oldest, un-expired event from the queue. - * - * @return the oldest, un-expired event - */ - public Forward poll() { - long tmin = System.currentTimeMillis() - maxAgeMs; - - Forward ev; - while ((ev = events.poll()) != null) { - if (!ev.isExpired(tmin)) { - break; - } - } - - return ev; - } - -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java index 5036b605..c25dc12d 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java @@ -20,7 +20,6 @@ package org.onap.policy.drools.pooling; -import java.util.concurrent.CountDownLatch; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Message; @@ -55,22 +54,11 @@ public interface PoolingManager { public String getTopic(); /** - * Indicates that communication with internal DMaaP topic failed, typically due to a - * missed heart beat. Stops the PolicyController. - * - * @return a latch that can be used to determine when the controller's stop() method - * has completed - */ - public CountDownLatch internalTopicFailed(); - - /** * Starts distributing requests according to the given bucket assignments. * * @param assignments must <i>not</i> be {@code null} - * @return a latch that can be used to determine when the events in the event queue - * have all be processed */ - public CountDownLatch startDistributing(BucketAssignments assignments); + public void startDistributing(BucketAssignments assignments); /** * Gets the current bucket assignments. diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index 86cec4c3..68dfee14 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -145,11 +145,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private ScheduledThreadPoolExecutor scheduler = null; /** - * Queue used when no bucket assignments are available. - */ - private final EventQueue eventq; - - /** * {@code True} if events offered by the controller should be intercepted, * {@code false} otherwise. */ @@ -175,7 +170,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { this.listener = (TopicListener) controller; this.serializer = new Serializer(); this.topic = props.getPoolingTopic(); - this.eventq = factory.makeEventQueue(props); this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource())); this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic()); this.current = new IdleState(this); @@ -307,11 +301,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { */ public void afterStop() { synchronized (curLocker) { - if (!eventq.isEmpty()) { - logger.warn("discarded {} messages after stopping topic {}", eventq.size(), topic); - eventq.clear(); - } - /* * stop the publisher, but allow time for any Offline message to be * transmitted @@ -381,26 +370,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } @Override - public CountDownLatch internalTopicFailed() { - logger.error("communication failed for topic {}", topic); - - CountDownLatch latch = new CountDownLatch(1); - - /* - * We don't want to build up items in our queue if we can't forward them to other - * hosts, so we just stop the controller. - * - * Use a background thread to prevent deadlocks. - */ - new Thread(() -> { - controller.stop(); - latch.countDown(); - }).start(); - - return latch; - } - - @Override public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) { // wrap the task in a TimerAction and schedule it ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); @@ -556,12 +525,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { */ private boolean handleExternal(Forward event) { if (assignments == null) { - // no bucket assignments yet - add it to the queue - logger.info("queued event for request {}", event.getRequestId()); - eventq.add(event); + // no bucket assignments yet - handle locally + logger.info("handle event locally for request {}", event.getRequestId()); - // we've consumed the event - return true; + // we did NOT consume the event + return false; } else { return handleEvent(event); @@ -741,42 +709,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } @Override - public CountDownLatch startDistributing(BucketAssignments asgn) { + public void startDistributing(BucketAssignments asgn) { synchronized (curLocker) { int sz = (asgn == null ? 0 : asgn.getAllHosts().size()); logger.info("new assignments for {} hosts on topic {}", sz, getTopic()); assignments = asgn; } - - if (asgn == null) { - return null; - } - - /* - * publish the events from the event queue, but do it in a background thread so - * that the state machine can enter its correct state BEFORE we start processing - * the events - */ - CountDownLatch latch = new CountDownLatch(1); - - new Thread(() -> { - synchronized (curLocker) { - if (assignments == null) { - latch.countDown(); - return; - } - - // now that we have assignments, we can process the queue - Forward ev; - while ((ev = eventq.poll()) != null) { - handle(ev); - } - - latch.countDown(); - } - }).start(); - - return latch; } @Override @@ -846,16 +784,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { public static class Factory { /** - * Creates an event queue. - * - * @param props properties used to configure the event queue - * @return a new event queue - */ - public EventQueue makeEventQueue(PoolingProperties props) { - return new EventQueue(props.getOfflineLimit(), props.getOfflineAgeMs()); - } - - /** * Creates object extractors. * * @param props properties used to configure the extractors diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index fcb0e139..a1be2a7c 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -147,11 +147,12 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Forward msg) { - if(!getHost().equals(msg.getChannel())) { - logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), getTopic()); + if (!getHost().equals(msg.getChannel())) { + logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), + getTopic()); return null; } - + logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); mgr.handle(msg); return null; @@ -337,26 +338,26 @@ public abstract class State { /** * Indicates that we failed to see our own heartbeat; must be a problem with the - * internal topic. + * internal topic. Assumes the problem is temporary and continues to use the current + * bucket assignments. * * @return a new {@link StartState} */ protected final State missedHeartbeat() { publish(makeOffline()); - mgr.startDistributing(null); return mgr.goStart(); } /** * Indicates that the internal topic failed; this should only be invoked from the - * StartState. + * StartState. Discards bucket assignments and begins processing everything locally. * * @return a new {@link InactiveState} */ protected final State internalTopicFailed() { publish(makeOffline()); - mgr.internalTopicFailed(); + mgr.startDistributing(null); return mgr.goInactive(); } |