summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java121
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java14
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java82
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java15
4 files changed, 14 insertions, 218 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java
deleted file mode 100644
index 0bed85b5..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling;
-
-import java.util.Deque;
-import java.util.LinkedList;
-import org.onap.policy.drools.pooling.message.Forward;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Finite queue of events waiting to be processed once the buckets have been
- * assigned.
- */
-public class EventQueue {
-
- private static final Logger logger = LoggerFactory.getLogger(EventQueue.class);
-
- /**
- * Maximum number of events allowed in the queue. When excess events are
- * added, the older events are removed.
- */
- private int maxEvents;
-
- /**
- * Maximum age, in milliseconds, of events in the queue. Events that are
- * older than this are discarded rather than being handed off when
- * {@link #poll()} is invoked.
- */
- private long maxAgeMs;
-
- /**
- * The actual queue of events.
- */
- private Deque<Forward> events = new LinkedList<>();
-
- /**
- *
- * @param maxEvents maximum number of events to hold in the queue
- * @param maxAgeMs maximum age of events in the queue
- */
- public EventQueue(int maxEvents, long maxAgeMs) {
- this.maxEvents = maxEvents;
- this.maxAgeMs = maxAgeMs;
- }
-
- /**
- *
- * @return {@code true} if the queue is empty, {@code false} otherwise
- */
- public boolean isEmpty() {
- return events.isEmpty();
- }
-
- /**
- * Clears the queue.
- */
- public void clear() {
- events.clear();
- }
-
- /**
- *
- * @return the number of elements in the queue
- */
- public int size() {
- return events.size();
- }
-
- /**
- * Adds an item to the queue. If the queue is full, the older item is
- * removed and discarded.
- *
- * @param event
- */
- public void add(Forward event) {
- if (events.size() >= maxEvents) {
- logger.warn("full queue - discarded event for topic {}", event.getTopic());
- events.remove();
- }
-
- events.add(event);
- }
-
- /**
- * Gets the oldest, un-expired event from the queue.
- *
- * @return the oldest, un-expired event
- */
- public Forward poll() {
- long tmin = System.currentTimeMillis() - maxAgeMs;
-
- Forward ev;
- while ((ev = events.poll()) != null) {
- if (!ev.isExpired(tmin)) {
- break;
- }
- }
-
- return ev;
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index 5036b605..c25dc12d 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -20,7 +20,6 @@
package org.onap.policy.drools.pooling;
-import java.util.concurrent.CountDownLatch;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Message;
@@ -55,22 +54,11 @@ public interface PoolingManager {
public String getTopic();
/**
- * Indicates that communication with internal DMaaP topic failed, typically due to a
- * missed heart beat. Stops the PolicyController.
- *
- * @return a latch that can be used to determine when the controller's stop() method
- * has completed
- */
- public CountDownLatch internalTopicFailed();
-
- /**
* Starts distributing requests according to the given bucket assignments.
*
* @param assignments must <i>not</i> be {@code null}
- * @return a latch that can be used to determine when the events in the event queue
- * have all be processed
*/
- public CountDownLatch startDistributing(BucketAssignments assignments);
+ public void startDistributing(BucketAssignments assignments);
/**
* Gets the current bucket assignments.
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 86cec4c3..68dfee14 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -145,11 +145,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private ScheduledThreadPoolExecutor scheduler = null;
/**
- * Queue used when no bucket assignments are available.
- */
- private final EventQueue eventq;
-
- /**
* {@code True} if events offered by the controller should be intercepted,
* {@code false} otherwise.
*/
@@ -175,7 +170,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.eventq = factory.makeEventQueue(props);
this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
@@ -307,11 +301,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*/
public void afterStop() {
synchronized (curLocker) {
- if (!eventq.isEmpty()) {
- logger.warn("discarded {} messages after stopping topic {}", eventq.size(), topic);
- eventq.clear();
- }
-
/*
* stop the publisher, but allow time for any Offline message to be
* transmitted
@@ -381,26 +370,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
@Override
- public CountDownLatch internalTopicFailed() {
- logger.error("communication failed for topic {}", topic);
-
- CountDownLatch latch = new CountDownLatch(1);
-
- /*
- * We don't want to build up items in our queue if we can't forward them to other
- * hosts, so we just stop the controller.
- *
- * Use a background thread to prevent deadlocks.
- */
- new Thread(() -> {
- controller.stop();
- latch.countDown();
- }).start();
-
- return latch;
- }
-
- @Override
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
// wrap the task in a TimerAction and schedule it
ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
@@ -556,12 +525,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*/
private boolean handleExternal(Forward event) {
if (assignments == null) {
- // no bucket assignments yet - add it to the queue
- logger.info("queued event for request {}", event.getRequestId());
- eventq.add(event);
+ // no bucket assignments yet - handle locally
+ logger.info("handle event locally for request {}", event.getRequestId());
- // we've consumed the event
- return true;
+ // we did NOT consume the event
+ return false;
} else {
return handleEvent(event);
@@ -741,42 +709,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
@Override
- public CountDownLatch startDistributing(BucketAssignments asgn) {
+ public void startDistributing(BucketAssignments asgn) {
synchronized (curLocker) {
int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
assignments = asgn;
}
-
- if (asgn == null) {
- return null;
- }
-
- /*
- * publish the events from the event queue, but do it in a background thread so
- * that the state machine can enter its correct state BEFORE we start processing
- * the events
- */
- CountDownLatch latch = new CountDownLatch(1);
-
- new Thread(() -> {
- synchronized (curLocker) {
- if (assignments == null) {
- latch.countDown();
- return;
- }
-
- // now that we have assignments, we can process the queue
- Forward ev;
- while ((ev = eventq.poll()) != null) {
- handle(ev);
- }
-
- latch.countDown();
- }
- }).start();
-
- return latch;
}
@Override
@@ -846,16 +784,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
public static class Factory {
/**
- * Creates an event queue.
- *
- * @param props properties used to configure the event queue
- * @return a new event queue
- */
- public EventQueue makeEventQueue(PoolingProperties props) {
- return new EventQueue(props.getOfflineLimit(), props.getOfflineAgeMs());
- }
-
- /**
* Creates object extractors.
*
* @param props properties used to configure the extractors
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index fcb0e139..a1be2a7c 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -147,11 +147,12 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Forward msg) {
- if(!getHost().equals(msg.getChannel())) {
- logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), getTopic());
+ if (!getHost().equals(msg.getChannel())) {
+ logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
+ getTopic());
return null;
}
-
+
logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
mgr.handle(msg);
return null;
@@ -337,26 +338,26 @@ public abstract class State {
/**
* Indicates that we failed to see our own heartbeat; must be a problem with the
- * internal topic.
+ * internal topic. Assumes the problem is temporary and continues to use the current
+ * bucket assignments.
*
* @return a new {@link StartState}
*/
protected final State missedHeartbeat() {
publish(makeOffline());
- mgr.startDistributing(null);
return mgr.goStart();
}
/**
* Indicates that the internal topic failed; this should only be invoked from the
- * StartState.
+ * StartState. Discards bucket assignments and begins processing everything locally.
*
* @return a new {@link InactiveState}
*/
protected final State internalTopicFailed() {
publish(makeOffline());
- mgr.internalTopicFailed();
+ mgr.startDistributing(null);
return mgr.goInactive();
}