summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2018-07-13 17:11:33 -0400
committerJim Hahn <jrh3@att.com>2018-07-13 17:33:27 -0400
commitf3c23f41a7ed7b85459c5688e5a1f3c177a1031a (patch)
tree3e89b22420e80f7be82db22460da72e2bb58feb9 /feature-pooling-dmaap
parentb65c92040c5acbf9013eccc5c9ce80dbd6587458 (diff)
Don't stop controller on dmaap failure
Modified feature-pooling-dmaap so that it continues to work even if it's unable to communicate with the internal DMaaP topic. When that happens, it simply starts processing locally, until communication is re-established and it receives a bucket assignment. Fixed typo in comment. Added to comment for State.internalTopicFailed(). Removed extra space before @Ignore. Change-Id: I9c851c66c9162c608f2df98e11d49fc526539434 Issue-ID: POLICY-878 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-pooling-dmaap')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java121
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java14
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java82
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java15
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java196
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java34
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java174
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java4
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java4
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java2
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java6
11 files changed, 83 insertions, 569 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java
deleted file mode 100644
index 0bed85b5..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling;
-
-import java.util.Deque;
-import java.util.LinkedList;
-import org.onap.policy.drools.pooling.message.Forward;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Finite queue of events waiting to be processed once the buckets have been
- * assigned.
- */
-public class EventQueue {
-
- private static final Logger logger = LoggerFactory.getLogger(EventQueue.class);
-
- /**
- * Maximum number of events allowed in the queue. When excess events are
- * added, the older events are removed.
- */
- private int maxEvents;
-
- /**
- * Maximum age, in milliseconds, of events in the queue. Events that are
- * older than this are discarded rather than being handed off when
- * {@link #poll()} is invoked.
- */
- private long maxAgeMs;
-
- /**
- * The actual queue of events.
- */
- private Deque<Forward> events = new LinkedList<>();
-
- /**
- *
- * @param maxEvents maximum number of events to hold in the queue
- * @param maxAgeMs maximum age of events in the queue
- */
- public EventQueue(int maxEvents, long maxAgeMs) {
- this.maxEvents = maxEvents;
- this.maxAgeMs = maxAgeMs;
- }
-
- /**
- *
- * @return {@code true} if the queue is empty, {@code false} otherwise
- */
- public boolean isEmpty() {
- return events.isEmpty();
- }
-
- /**
- * Clears the queue.
- */
- public void clear() {
- events.clear();
- }
-
- /**
- *
- * @return the number of elements in the queue
- */
- public int size() {
- return events.size();
- }
-
- /**
- * Adds an item to the queue. If the queue is full, the older item is
- * removed and discarded.
- *
- * @param event
- */
- public void add(Forward event) {
- if (events.size() >= maxEvents) {
- logger.warn("full queue - discarded event for topic {}", event.getTopic());
- events.remove();
- }
-
- events.add(event);
- }
-
- /**
- * Gets the oldest, un-expired event from the queue.
- *
- * @return the oldest, un-expired event
- */
- public Forward poll() {
- long tmin = System.currentTimeMillis() - maxAgeMs;
-
- Forward ev;
- while ((ev = events.poll()) != null) {
- if (!ev.isExpired(tmin)) {
- break;
- }
- }
-
- return ev;
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index 5036b605..c25dc12d 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -20,7 +20,6 @@
package org.onap.policy.drools.pooling;
-import java.util.concurrent.CountDownLatch;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Message;
@@ -55,22 +54,11 @@ public interface PoolingManager {
public String getTopic();
/**
- * Indicates that communication with internal DMaaP topic failed, typically due to a
- * missed heart beat. Stops the PolicyController.
- *
- * @return a latch that can be used to determine when the controller's stop() method
- * has completed
- */
- public CountDownLatch internalTopicFailed();
-
- /**
* Starts distributing requests according to the given bucket assignments.
*
* @param assignments must <i>not</i> be {@code null}
- * @return a latch that can be used to determine when the events in the event queue
- * have all be processed
*/
- public CountDownLatch startDistributing(BucketAssignments assignments);
+ public void startDistributing(BucketAssignments assignments);
/**
* Gets the current bucket assignments.
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 86cec4c3..68dfee14 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -145,11 +145,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private ScheduledThreadPoolExecutor scheduler = null;
/**
- * Queue used when no bucket assignments are available.
- */
- private final EventQueue eventq;
-
- /**
* {@code True} if events offered by the controller should be intercepted,
* {@code false} otherwise.
*/
@@ -175,7 +170,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.eventq = factory.makeEventQueue(props);
this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
@@ -307,11 +301,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*/
public void afterStop() {
synchronized (curLocker) {
- if (!eventq.isEmpty()) {
- logger.warn("discarded {} messages after stopping topic {}", eventq.size(), topic);
- eventq.clear();
- }
-
/*
* stop the publisher, but allow time for any Offline message to be
* transmitted
@@ -381,26 +370,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
@Override
- public CountDownLatch internalTopicFailed() {
- logger.error("communication failed for topic {}", topic);
-
- CountDownLatch latch = new CountDownLatch(1);
-
- /*
- * We don't want to build up items in our queue if we can't forward them to other
- * hosts, so we just stop the controller.
- *
- * Use a background thread to prevent deadlocks.
- */
- new Thread(() -> {
- controller.stop();
- latch.countDown();
- }).start();
-
- return latch;
- }
-
- @Override
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
// wrap the task in a TimerAction and schedule it
ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
@@ -556,12 +525,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*/
private boolean handleExternal(Forward event) {
if (assignments == null) {
- // no bucket assignments yet - add it to the queue
- logger.info("queued event for request {}", event.getRequestId());
- eventq.add(event);
+ // no bucket assignments yet - handle locally
+ logger.info("handle event locally for request {}", event.getRequestId());
- // we've consumed the event
- return true;
+ // we did NOT consume the event
+ return false;
} else {
return handleEvent(event);
@@ -741,42 +709,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
@Override
- public CountDownLatch startDistributing(BucketAssignments asgn) {
+ public void startDistributing(BucketAssignments asgn) {
synchronized (curLocker) {
int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
assignments = asgn;
}
-
- if (asgn == null) {
- return null;
- }
-
- /*
- * publish the events from the event queue, but do it in a background thread so
- * that the state machine can enter its correct state BEFORE we start processing
- * the events
- */
- CountDownLatch latch = new CountDownLatch(1);
-
- new Thread(() -> {
- synchronized (curLocker) {
- if (assignments == null) {
- latch.countDown();
- return;
- }
-
- // now that we have assignments, we can process the queue
- Forward ev;
- while ((ev = eventq.poll()) != null) {
- handle(ev);
- }
-
- latch.countDown();
- }
- }).start();
-
- return latch;
}
@Override
@@ -846,16 +784,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
public static class Factory {
/**
- * Creates an event queue.
- *
- * @param props properties used to configure the event queue
- * @return a new event queue
- */
- public EventQueue makeEventQueue(PoolingProperties props) {
- return new EventQueue(props.getOfflineLimit(), props.getOfflineAgeMs());
- }
-
- /**
* Creates object extractors.
*
* @param props properties used to configure the extractors
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index fcb0e139..a1be2a7c 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -147,11 +147,12 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Forward msg) {
- if(!getHost().equals(msg.getChannel())) {
- logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), getTopic());
+ if (!getHost().equals(msg.getChannel())) {
+ logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
+ getTopic());
return null;
}
-
+
logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
mgr.handle(msg);
return null;
@@ -337,26 +338,26 @@ public abstract class State {
/**
* Indicates that we failed to see our own heartbeat; must be a problem with the
- * internal topic.
+ * internal topic. Assumes the problem is temporary and continues to use the current
+ * bucket assignments.
*
* @return a new {@link StartState}
*/
protected final State missedHeartbeat() {
publish(makeOffline());
- mgr.startDistributing(null);
return mgr.goStart();
}
/**
* Indicates that the internal topic failed; this should only be invoked from the
- * StartState.
+ * StartState. Discards bucket assignments and begins processing everything locally.
*
* @return a new {@link InactiveState}
*/
protected final State internalTopicFailed() {
publish(makeOffline());
- mgr.internalTopicFailed();
+ mgr.startDistributing(null);
return mgr.goInactive();
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java
deleted file mode 100644
index 24144686..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import java.util.LinkedList;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.pooling.message.Forward;
-
-public class EventQueueTest {
-
- private static final int MAX_SIZE = 5;
- private static final long MAX_AGE_MS = 3000L;
-
- private static final String MY_SOURCE = "my.source";
- private static final CommInfrastructure MY_PROTO = CommInfrastructure.UEB;
- private static final String MY_TOPIC = "my.topic";
- private static final String MY_PAYLOAD = "my.payload";
- private static final String MY_REQID = "my.request.id";
-
- private EventQueue queue;
-
- @Before
- public void setUp() {
- queue = new EventQueue(MAX_SIZE, MAX_AGE_MS);
-
- }
-
- @Test
- public void testEventQueue() {
- // shouldn't generate an exception
- new EventQueue(1, 1);
- }
-
- @Test
- public void testClear() {
- // add some items
- queue.add(makeActive());
- queue.add(makeActive());
-
- assertFalse(queue.isEmpty());
-
- queue.clear();
-
- // should be empty now
- assertTrue(queue.isEmpty());
- }
-
- @Test
- public void testIsEmpty() {
- // test when empty
- assertTrue(queue.isEmpty());
-
- // all active
- Forward msg1 = makeActive();
- Forward msg2 = makeActive();
- queue.add(msg1);
- assertFalse(queue.isEmpty());
-
- queue.add(msg2);
- assertFalse(queue.isEmpty());
-
- assertEquals(msg1, queue.poll());
- assertFalse(queue.isEmpty());
-
- assertEquals(msg2, queue.poll());
- assertTrue(queue.isEmpty());
-
- // active, expired, expired, active
- queue.add(msg1);
- queue.add(makeInactive());
- queue.add(makeInactive());
- queue.add(msg2);
-
- assertEquals(msg1, queue.poll());
- assertFalse(queue.isEmpty());
-
- assertEquals(msg2, queue.poll());
- assertTrue(queue.isEmpty());
- }
-
- @Test
- public void testSize() {
- queue = new EventQueue(2, 1000L);
- assertEquals(0, queue.size());
-
- queue.add(makeActive());
- assertEquals(1, queue.size());
-
- queue.poll();
- assertEquals(0, queue.size());
-
- queue.add(makeActive());
- queue.add(makeActive());
- assertEquals(2, queue.size());
-
- queue.poll();
- assertEquals(1, queue.size());
-
- queue.poll();
- assertEquals(0, queue.size());
- }
-
- @Test
- public void testAdd() {
- int nextra = 3;
-
- // create excess messages
- LinkedList<Forward> msgs = new LinkedList<>();
- for (int x = 0; x < MAX_SIZE + nextra; ++x) {
- msgs.add(makeActive());
- }
-
- // add them to the queue
- msgs.forEach(msg -> queue.add(msg));
-
- // should not have added too many messages
- assertEquals(MAX_SIZE, queue.size());
-
- // should have discarded the first "nextra" items
- for (int x = 0; x < MAX_SIZE; ++x) {
- assertEquals("x=" + x, msgs.get(x + nextra), queue.poll());
- }
-
- assertEquals(null, queue.poll());
- }
-
- @Test
- public void testPoll() {
- // poll when empty
- assertNull(queue.poll());
-
- // all active
- Forward msg1 = makeActive();
- Forward msg2 = makeActive();
- queue.add(msg1);
- queue.add(msg2);
-
- assertEquals(msg1, queue.poll());
- assertEquals(msg2, queue.poll());
- assertEquals(null, queue.poll());
-
- // active, expired, expired, active
- queue.add(msg1);
- queue.add(makeInactive());
- queue.add(makeInactive());
- queue.add(msg2);
-
- assertEquals(msg1, queue.poll());
- assertEquals(msg2, queue.poll());
- assertEquals(null, queue.poll());
-
- // one that's close to the age limit
- msg1 = makeActive();
- msg1.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS + 100);
- queue.add(msg1);
- assertEquals(msg1, queue.poll());
- assertEquals(null, queue.poll());
- }
-
- private Forward makeActive() {
- return new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID);
- }
-
- private Forward makeInactive() {
- Forward msg = new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID);
-
- msg.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS - 100);
-
- return msg;
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
index 298af064..6280ebed 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
@@ -28,9 +28,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
@@ -121,6 +123,7 @@ public class FeatureTest2 {
// these are saved and restored on exit from this test class
private static PoolingFeature.Factory saveFeatureFactory;
private static PoolingManagerImpl.Factory saveManagerFactory;
+ private static DmaapManager.Factory saveDmaapFactory;
/**
* Sink for external DMaaP topic.
@@ -128,6 +131,11 @@ public class FeatureTest2 {
private static TopicSink externalSink;
/**
+ * Sink for internal DMaaP topic.
+ */
+ private static TopicSink internalSink;
+
+ /**
* Context for the current test case.
*/
private Context ctx;
@@ -137,18 +145,23 @@ public class FeatureTest2 {
public static void setUpBeforeClass() {
saveFeatureFactory = PoolingFeature.getFactory();
saveManagerFactory = PoolingManagerImpl.getFactory();
+ saveDmaapFactory = DmaapManager.getFactory();
- Properties props = makeSinkProperties(EXTERNAL_TOPIC);
- externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0);
+ externalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
externalSink.start();
+
+ internalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
+ internalSink.start();
}
@AfterClass
public static void tearDownAfterClass() {
PoolingFeature.setFactory(saveFeatureFactory);
PoolingManagerImpl.setFactory(saveManagerFactory);
+ DmaapManager.setFactory(saveDmaapFactory);
externalSink.stop();
+ internalSink.stop();
}
@Before
@@ -443,6 +456,7 @@ public class FeatureTest2 {
private final AtomicBoolean sawMsg = new AtomicBoolean(false);
private final TopicSource externalSource;
+ private final TopicSource internalSource;
// mock objects
private final PolicyEngine engine = mock(PolicyEngine.class);
@@ -458,8 +472,8 @@ public class FeatureTest2 {
when(controller.getName()).thenReturn(CONTROLLER1);
when(controller.getDrools()).thenReturn(drools);
- Properties props = makeSourceProperties(EXTERNAL_TOPIC);
- externalSource = TopicEndpoint.manager.addTopicSources(props).get(0);
+ externalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
+ internalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
// stop consuming events if the controller stops
when(controller.stop()).thenAnswer(args -> {
@@ -489,6 +503,18 @@ public class FeatureTest2 {
* "DMaaP" topic and its own internal "DMaaP" topic.
*/
public void start() {
+ DmaapManager.setFactory(new DmaapManager.Factory() {
+ @Override
+ public List<TopicSource> getTopicSources() {
+ return Arrays.asList(internalSource, externalSource);
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks() {
+ return Arrays.asList(internalSink, externalSink);
+ }
+ });
+
feature.beforeStart(engine);
feature.afterCreate(controller);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index 64573ab0..d74b87f9 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -23,7 +23,6 @@ package org.onap.policy.drools.pooling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -106,7 +105,6 @@ public class PoolingManagerImplTest {
private Properties plainProps;
private PoolingProperties poolProps;
private ListeningController controller;
- private EventQueue eventQueue;
private ClassExtractors extractors;
private DmaapManager dmaap;
private ScheduledThreadPoolExecutor sched;
@@ -146,14 +144,12 @@ public class PoolingManagerImplTest {
active = new CountDownLatch(1);
factory = mock(Factory.class);
- eventQueue = mock(EventQueue.class);
extractors = mock(ClassExtractors.class);
dmaap = mock(DmaapManager.class);
controller = mock(ListeningController.class);
sched = mock(ScheduledThreadPoolExecutor.class);
drools = mock(DroolsController.class);
- when(factory.makeEventQueue(any())).thenReturn(eventQueue);
when(factory.makeClassExtractors(any())).thenReturn(extractors);
when(factory.makeDmaapManager(any())).thenReturn(dmaap);
when(factory.makeScheduler()).thenReturn(sched);
@@ -304,24 +300,25 @@ public class PoolingManagerImplTest {
@Test
public void testBeforeStop() throws Exception {
startMgr();
- mgr.startDistributing(makeAssignments(true));
+ mgr.startDistributing(makeAssignments(false));
- // verify that this message is not queued
Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
mgr.handle(msg);
- verify(eventQueue, never()).add(msg);
+ verify(dmaap, times(START_PUB+1)).publish(any());
mgr.beforeStop();
verify(dmaap).stopConsumer(mgr);
verify(sched).shutdownNow();
+ verify(dmaap, times(START_PUB+2)).publish(any());
verify(dmaap).publish(contains("offline"));
assertTrue(mgr.getCurrent() instanceof IdleState);
- // verify that next message is queued
- mgr.handle(msg);
- verify(eventQueue).add(msg);
+ // verify that next message is handled locally
+ mgr.handle(msg);
+ verify(dmaap, times(START_PUB+2)).publish(any());
+ verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
}
@Test
@@ -360,26 +357,8 @@ public class PoolingManagerImplTest {
startMgr();
mgr.beforeStop();
- when(eventQueue.isEmpty()).thenReturn(false);
- when(eventQueue.size()).thenReturn(3);
-
- mgr.afterStop();
-
- verify(eventQueue).clear();
- verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
- }
-
- @Test
- public void testAfterStop_EmptyQueue() throws Exception {
- startMgr();
- mgr.beforeStop();
-
- when(eventQueue.isEmpty()).thenReturn(true);
- when(eventQueue.size()).thenReturn(0);
-
mgr.afterStop();
- verify(eventQueue, never()).clear();
verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
}
@@ -499,18 +478,6 @@ public class PoolingManagerImplTest {
}
@Test
- public void testInternalTopicFailed() throws Exception {
- startMgr();
-
- CountDownLatch latch = mgr.internalTopicFailed();
-
- // wait for the thread to complete
- assertTrue(latch.await(2, TimeUnit.SECONDS));
-
- verify(controller).stop();
- }
-
- @Test
public void testSchedule() throws Exception {
// must start the scheduler
startMgr();
@@ -693,14 +660,7 @@ public class PoolingManagerImplTest {
public void testBeforeInsert_NoIntercept() throws Exception {
startMgr();
- long tbegin = System.currentTimeMillis();
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
- verify(eventQueue).add(msgCap.capture());
-
- validateMessageContent(tbegin, msgCap.getValue());
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
}
@Test
@@ -726,37 +686,20 @@ public class PoolingManagerImplTest {
startMgr();
assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- // should not have tried to enqueue a message
- verify(eventQueue, never()).add(any());
}
@Test
public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
startMgr();
- long tbegin = System.currentTimeMillis();
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
- verify(eventQueue).add(msgCap.capture());
-
- validateMessageContent(tbegin, msgCap.getValue());
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
}
@Test
public void testHandleExternalForward_NoAssignments() throws Exception {
startMgr();
- long tbegin = System.currentTimeMillis();
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
- verify(eventQueue).add(msgCap.capture());
-
- validateMessageContent(tbegin, msgCap.getValue());
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
}
@Test
@@ -921,25 +864,26 @@ public class PoolingManagerImplTest {
@Test
public void testMakeForward() throws Exception {
startMgr();
-
- long tbegin = System.currentTimeMillis();
+
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
- verify(eventQueue).add(msgCap.capture());
-
- validateMessageContent(tbegin, msgCap.getValue());
+
+ verify(dmaap, times(START_PUB+1)).publish(any());
}
@Test
public void testMakeForward_InvalidMsg() throws Exception {
startMgr();
+
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
- // should not have tried to enqueue a message
- verify(eventQueue, never()).add(any());
+ // should not have tried to publish a message
+ verify(dmaap, times(START_PUB)).publish(any());
}
@Test
@@ -1064,69 +1008,27 @@ public class PoolingManagerImplTest {
startMgr();
// route the message to this host
- assertNotNull(mgr.startDistributing(makeAssignments(true)));
+ mgr.startDistributing(makeAssignments(true));
assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(eventQueue, never()).add(any());
+ verify(dmaap, times(START_PUB)).publish(any());
- // null assignments should cause message to be queued
- assertNull(mgr.startDistributing(null));
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(eventQueue).add(any());
+ // null assignments should cause message to be processed locally
+ mgr.startDistributing(null);
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ verify(dmaap, times(START_PUB)).publish(any());
// route the message to this host
- assertNotNull(mgr.startDistributing(makeAssignments(true)));
+ mgr.startDistributing(makeAssignments(true));
assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(eventQueue).add(any());
+ verify(dmaap, times(START_PUB)).publish(any());
// route the message to the other host
- assertNotNull(mgr.startDistributing(makeAssignments(false)));
+ mgr.startDistributing(makeAssignments(false));
assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(eventQueue).add(any());
- }
-
- @Test
- public void testStartDistributing_EventsInQueue_ProcessLocally() throws Exception {
- startMgr();
-
- // put items in the queue
- LinkedList<Forward> lst = new LinkedList<>();
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-
- when(eventQueue.poll()).thenAnswer(args -> lst.poll());
-
- // route the messages to this host
- CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
- assertTrue(latch.await(2, TimeUnit.SECONDS));
-
- // all of the events should have been processed locally
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller, times(3)).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testStartDistributing_EventsInQueue_Forward() throws Exception {
- startMgr();
-
- // put items in the queue
- LinkedList<Forward> lst = new LinkedList<>();
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
- lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-
- when(eventQueue.poll()).thenAnswer(args -> lst.poll());
-
- // route the messages to the OTHER host
- CountDownLatch latch = mgr.startDistributing(makeAssignments(false));
- assertTrue(latch.await(2, TimeUnit.SECONDS));
-
- // all of the events should have been forwarded
- verify(dmaap, times(4)).publish(any());
- verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ verify(dmaap, times(START_PUB+1)).publish(any());
}
@Test
@@ -1228,22 +1130,6 @@ public class PoolingManagerImplTest {
}
/**
- * Validates the message content.
- *
- * @param tbegin creation time stamp must be no less than this
- * @param msg message to be validated
- */
- private void validateMessageContent(long tbegin, Forward msg) {
- assertEquals(0, msg.getNumHops());
- assertTrue(msg.getCreateTimeMs() >= tbegin);
- assertEquals(mgr.getHost(), msg.getSource());
- assertEquals(CommInfrastructure.UEB, msg.getProtocol());
- assertEquals(TOPIC2, msg.getTopic());
- assertEquals(THE_EVENT, msg.getPayload());
- assertEquals(REQUEST_ID, msg.getRequestId());
- }
-
- /**
* Configure the mock controller to act like a real controller, invoking beforeOffer
* and then beforeInsert, so we can make sure they pass through. We'll keep count to
* ensure we don't get into infinite recursion.
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
index 27284dcd..f2701038 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -394,8 +394,8 @@ public class ActiveStateTest extends BasicStateTester {
// fire the task - should transition
assertEquals(next, task.third().fire());
- // should stop distributing
- verify(mgr).startDistributing(null);
+ // should continue to distribute
+ verify(mgr, never()).startDistributing(null);
// should publish an offline message
Offline msg = captureAdminMessage(Offline.class);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
index 80778ed4..7cd37581 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -257,8 +257,8 @@ public class QueryStateTest extends BasicStateTester {
assertEquals(next, timer.second().fire());
- // should stop distributing
- verify(mgr).startDistributing(null);
+ // should continue distributing
+ verify(mgr, never()).startDistributing(null);
Offline msg = captureAdminMessage(Offline.class);
assertEquals(MY_HOST, msg.getSource());
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
index 01f49b55..18f12ff8 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -111,7 +111,7 @@ public class StartStateTest extends BasicStateTester {
assertEquals(next, checker.second().fire());
- verify(mgr).internalTopicFailed();
+ verify(mgr).startDistributing(null);
}
@Test
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
index cdf9b59a..e3d383d9 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -423,7 +423,8 @@ public class StateTest extends BasicStateTester {
State next2 = state.missedHeartbeat();
assertEquals(next, next2);
- verify(mgr).startDistributing(null);
+ // should continue to distribute
+ verify(mgr, never()).startDistributing(null);
Offline msg = captureAdminMessage(Offline.class);
assertEquals(MY_HOST, msg.getSource());
@@ -437,7 +438,8 @@ public class StateTest extends BasicStateTester {
State next2 = state.internalTopicFailed();
assertEquals(next, next2);
- verify(mgr).internalTopicFailed();
+ // should stop distributing
+ verify(mgr).startDistributing(null);
Offline msg = captureAdminMessage(Offline.class);
assertEquals(MY_HOST, msg.getSource());