diff options
Diffstat (limited to 'feature-pooling-dmaap/src')
11 files changed, 83 insertions, 569 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java deleted file mode 100644 index 0bed85b5..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling; - -import java.util.Deque; -import java.util.LinkedList; -import org.onap.policy.drools.pooling.message.Forward; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Finite queue of events waiting to be processed once the buckets have been - * assigned. - */ -public class EventQueue { - - private static final Logger logger = LoggerFactory.getLogger(EventQueue.class); - - /** - * Maximum number of events allowed in the queue. When excess events are - * added, the older events are removed. - */ - private int maxEvents; - - /** - * Maximum age, in milliseconds, of events in the queue. Events that are - * older than this are discarded rather than being handed off when - * {@link #poll()} is invoked. - */ - private long maxAgeMs; - - /** - * The actual queue of events. - */ - private Deque<Forward> events = new LinkedList<>(); - - /** - * - * @param maxEvents maximum number of events to hold in the queue - * @param maxAgeMs maximum age of events in the queue - */ - public EventQueue(int maxEvents, long maxAgeMs) { - this.maxEvents = maxEvents; - this.maxAgeMs = maxAgeMs; - } - - /** - * - * @return {@code true} if the queue is empty, {@code false} otherwise - */ - public boolean isEmpty() { - return events.isEmpty(); - } - - /** - * Clears the queue. - */ - public void clear() { - events.clear(); - } - - /** - * - * @return the number of elements in the queue - */ - public int size() { - return events.size(); - } - - /** - * Adds an item to the queue. If the queue is full, the older item is - * removed and discarded. - * - * @param event - */ - public void add(Forward event) { - if (events.size() >= maxEvents) { - logger.warn("full queue - discarded event for topic {}", event.getTopic()); - events.remove(); - } - - events.add(event); - } - - /** - * Gets the oldest, un-expired event from the queue. - * - * @return the oldest, un-expired event - */ - public Forward poll() { - long tmin = System.currentTimeMillis() - maxAgeMs; - - Forward ev; - while ((ev = events.poll()) != null) { - if (!ev.isExpired(tmin)) { - break; - } - } - - return ev; - } - -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java index 5036b605..c25dc12d 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java @@ -20,7 +20,6 @@ package org.onap.policy.drools.pooling; -import java.util.concurrent.CountDownLatch; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Message; @@ -55,22 +54,11 @@ public interface PoolingManager { public String getTopic(); /** - * Indicates that communication with internal DMaaP topic failed, typically due to a - * missed heart beat. Stops the PolicyController. - * - * @return a latch that can be used to determine when the controller's stop() method - * has completed - */ - public CountDownLatch internalTopicFailed(); - - /** * Starts distributing requests according to the given bucket assignments. * * @param assignments must <i>not</i> be {@code null} - * @return a latch that can be used to determine when the events in the event queue - * have all be processed */ - public CountDownLatch startDistributing(BucketAssignments assignments); + public void startDistributing(BucketAssignments assignments); /** * Gets the current bucket assignments. diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index 86cec4c3..68dfee14 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -145,11 +145,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private ScheduledThreadPoolExecutor scheduler = null; /** - * Queue used when no bucket assignments are available. - */ - private final EventQueue eventq; - - /** * {@code True} if events offered by the controller should be intercepted, * {@code false} otherwise. */ @@ -175,7 +170,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { this.listener = (TopicListener) controller; this.serializer = new Serializer(); this.topic = props.getPoolingTopic(); - this.eventq = factory.makeEventQueue(props); this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource())); this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic()); this.current = new IdleState(this); @@ -307,11 +301,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { */ public void afterStop() { synchronized (curLocker) { - if (!eventq.isEmpty()) { - logger.warn("discarded {} messages after stopping topic {}", eventq.size(), topic); - eventq.clear(); - } - /* * stop the publisher, but allow time for any Offline message to be * transmitted @@ -381,26 +370,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } @Override - public CountDownLatch internalTopicFailed() { - logger.error("communication failed for topic {}", topic); - - CountDownLatch latch = new CountDownLatch(1); - - /* - * We don't want to build up items in our queue if we can't forward them to other - * hosts, so we just stop the controller. - * - * Use a background thread to prevent deadlocks. - */ - new Thread(() -> { - controller.stop(); - latch.countDown(); - }).start(); - - return latch; - } - - @Override public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) { // wrap the task in a TimerAction and schedule it ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); @@ -556,12 +525,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { */ private boolean handleExternal(Forward event) { if (assignments == null) { - // no bucket assignments yet - add it to the queue - logger.info("queued event for request {}", event.getRequestId()); - eventq.add(event); + // no bucket assignments yet - handle locally + logger.info("handle event locally for request {}", event.getRequestId()); - // we've consumed the event - return true; + // we did NOT consume the event + return false; } else { return handleEvent(event); @@ -741,42 +709,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } @Override - public CountDownLatch startDistributing(BucketAssignments asgn) { + public void startDistributing(BucketAssignments asgn) { synchronized (curLocker) { int sz = (asgn == null ? 0 : asgn.getAllHosts().size()); logger.info("new assignments for {} hosts on topic {}", sz, getTopic()); assignments = asgn; } - - if (asgn == null) { - return null; - } - - /* - * publish the events from the event queue, but do it in a background thread so - * that the state machine can enter its correct state BEFORE we start processing - * the events - */ - CountDownLatch latch = new CountDownLatch(1); - - new Thread(() -> { - synchronized (curLocker) { - if (assignments == null) { - latch.countDown(); - return; - } - - // now that we have assignments, we can process the queue - Forward ev; - while ((ev = eventq.poll()) != null) { - handle(ev); - } - - latch.countDown(); - } - }).start(); - - return latch; } @Override @@ -846,16 +784,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { public static class Factory { /** - * Creates an event queue. - * - * @param props properties used to configure the event queue - * @return a new event queue - */ - public EventQueue makeEventQueue(PoolingProperties props) { - return new EventQueue(props.getOfflineLimit(), props.getOfflineAgeMs()); - } - - /** * Creates object extractors. * * @param props properties used to configure the extractors diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index fcb0e139..a1be2a7c 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -147,11 +147,12 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Forward msg) { - if(!getHost().equals(msg.getChannel())) { - logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), getTopic()); + if (!getHost().equals(msg.getChannel())) { + logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), + getTopic()); return null; } - + logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); mgr.handle(msg); return null; @@ -337,26 +338,26 @@ public abstract class State { /** * Indicates that we failed to see our own heartbeat; must be a problem with the - * internal topic. + * internal topic. Assumes the problem is temporary and continues to use the current + * bucket assignments. * * @return a new {@link StartState} */ protected final State missedHeartbeat() { publish(makeOffline()); - mgr.startDistributing(null); return mgr.goStart(); } /** * Indicates that the internal topic failed; this should only be invoked from the - * StartState. + * StartState. Discards bucket assignments and begins processing everything locally. * * @return a new {@link InactiveState} */ protected final State internalTopicFailed() { publish(makeOffline()); - mgr.internalTopicFailed(); + mgr.startDistributing(null); return mgr.goInactive(); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java deleted file mode 100644 index 24144686..00000000 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import java.util.LinkedList; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; -import org.onap.policy.drools.pooling.message.Forward; - -public class EventQueueTest { - - private static final int MAX_SIZE = 5; - private static final long MAX_AGE_MS = 3000L; - - private static final String MY_SOURCE = "my.source"; - private static final CommInfrastructure MY_PROTO = CommInfrastructure.UEB; - private static final String MY_TOPIC = "my.topic"; - private static final String MY_PAYLOAD = "my.payload"; - private static final String MY_REQID = "my.request.id"; - - private EventQueue queue; - - @Before - public void setUp() { - queue = new EventQueue(MAX_SIZE, MAX_AGE_MS); - - } - - @Test - public void testEventQueue() { - // shouldn't generate an exception - new EventQueue(1, 1); - } - - @Test - public void testClear() { - // add some items - queue.add(makeActive()); - queue.add(makeActive()); - - assertFalse(queue.isEmpty()); - - queue.clear(); - - // should be empty now - assertTrue(queue.isEmpty()); - } - - @Test - public void testIsEmpty() { - // test when empty - assertTrue(queue.isEmpty()); - - // all active - Forward msg1 = makeActive(); - Forward msg2 = makeActive(); - queue.add(msg1); - assertFalse(queue.isEmpty()); - - queue.add(msg2); - assertFalse(queue.isEmpty()); - - assertEquals(msg1, queue.poll()); - assertFalse(queue.isEmpty()); - - assertEquals(msg2, queue.poll()); - assertTrue(queue.isEmpty()); - - // active, expired, expired, active - queue.add(msg1); - queue.add(makeInactive()); - queue.add(makeInactive()); - queue.add(msg2); - - assertEquals(msg1, queue.poll()); - assertFalse(queue.isEmpty()); - - assertEquals(msg2, queue.poll()); - assertTrue(queue.isEmpty()); - } - - @Test - public void testSize() { - queue = new EventQueue(2, 1000L); - assertEquals(0, queue.size()); - - queue.add(makeActive()); - assertEquals(1, queue.size()); - - queue.poll(); - assertEquals(0, queue.size()); - - queue.add(makeActive()); - queue.add(makeActive()); - assertEquals(2, queue.size()); - - queue.poll(); - assertEquals(1, queue.size()); - - queue.poll(); - assertEquals(0, queue.size()); - } - - @Test - public void testAdd() { - int nextra = 3; - - // create excess messages - LinkedList<Forward> msgs = new LinkedList<>(); - for (int x = 0; x < MAX_SIZE + nextra; ++x) { - msgs.add(makeActive()); - } - - // add them to the queue - msgs.forEach(msg -> queue.add(msg)); - - // should not have added too many messages - assertEquals(MAX_SIZE, queue.size()); - - // should have discarded the first "nextra" items - for (int x = 0; x < MAX_SIZE; ++x) { - assertEquals("x=" + x, msgs.get(x + nextra), queue.poll()); - } - - assertEquals(null, queue.poll()); - } - - @Test - public void testPoll() { - // poll when empty - assertNull(queue.poll()); - - // all active - Forward msg1 = makeActive(); - Forward msg2 = makeActive(); - queue.add(msg1); - queue.add(msg2); - - assertEquals(msg1, queue.poll()); - assertEquals(msg2, queue.poll()); - assertEquals(null, queue.poll()); - - // active, expired, expired, active - queue.add(msg1); - queue.add(makeInactive()); - queue.add(makeInactive()); - queue.add(msg2); - - assertEquals(msg1, queue.poll()); - assertEquals(msg2, queue.poll()); - assertEquals(null, queue.poll()); - - // one that's close to the age limit - msg1 = makeActive(); - msg1.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS + 100); - queue.add(msg1); - assertEquals(msg1, queue.poll()); - assertEquals(null, queue.poll()); - } - - private Forward makeActive() { - return new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID); - } - - private Forward makeInactive() { - Forward msg = new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID); - - msg.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS - 100); - - return msg; - } - -} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java index 298af064..6280ebed 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java @@ -28,9 +28,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX; import java.io.IOException; +import java.util.Arrays; import java.util.Deque; import java.util.IdentityHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; @@ -121,6 +123,7 @@ public class FeatureTest2 { // these are saved and restored on exit from this test class private static PoolingFeature.Factory saveFeatureFactory; private static PoolingManagerImpl.Factory saveManagerFactory; + private static DmaapManager.Factory saveDmaapFactory; /** * Sink for external DMaaP topic. @@ -128,6 +131,11 @@ public class FeatureTest2 { private static TopicSink externalSink; /** + * Sink for internal DMaaP topic. + */ + private static TopicSink internalSink; + + /** * Context for the current test case. */ private Context ctx; @@ -137,18 +145,23 @@ public class FeatureTest2 { public static void setUpBeforeClass() { saveFeatureFactory = PoolingFeature.getFactory(); saveManagerFactory = PoolingManagerImpl.getFactory(); + saveDmaapFactory = DmaapManager.getFactory(); - Properties props = makeSinkProperties(EXTERNAL_TOPIC); - externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0); + externalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0); externalSink.start(); + + internalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0); + internalSink.start(); } @AfterClass public static void tearDownAfterClass() { PoolingFeature.setFactory(saveFeatureFactory); PoolingManagerImpl.setFactory(saveManagerFactory); + DmaapManager.setFactory(saveDmaapFactory); externalSink.stop(); + internalSink.stop(); } @Before @@ -443,6 +456,7 @@ public class FeatureTest2 { private final AtomicBoolean sawMsg = new AtomicBoolean(false); private final TopicSource externalSource; + private final TopicSource internalSource; // mock objects private final PolicyEngine engine = mock(PolicyEngine.class); @@ -458,8 +472,8 @@ public class FeatureTest2 { when(controller.getName()).thenReturn(CONTROLLER1); when(controller.getDrools()).thenReturn(drools); - Properties props = makeSourceProperties(EXTERNAL_TOPIC); - externalSource = TopicEndpoint.manager.addTopicSources(props).get(0); + externalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0); + internalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0); // stop consuming events if the controller stops when(controller.stop()).thenAnswer(args -> { @@ -489,6 +503,18 @@ public class FeatureTest2 { * "DMaaP" topic and its own internal "DMaaP" topic. */ public void start() { + DmaapManager.setFactory(new DmaapManager.Factory() { + @Override + public List<TopicSource> getTopicSources() { + return Arrays.asList(internalSource, externalSource); + } + + @Override + public List<TopicSink> getTopicSinks() { + return Arrays.asList(internalSink, externalSink); + } + }); + feature.beforeStart(engine); feature.afterCreate(controller); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index 64573ab0..d74b87f9 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -23,7 +23,6 @@ package org.onap.policy.drools.pooling; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -106,7 +105,6 @@ public class PoolingManagerImplTest { private Properties plainProps; private PoolingProperties poolProps; private ListeningController controller; - private EventQueue eventQueue; private ClassExtractors extractors; private DmaapManager dmaap; private ScheduledThreadPoolExecutor sched; @@ -146,14 +144,12 @@ public class PoolingManagerImplTest { active = new CountDownLatch(1); factory = mock(Factory.class); - eventQueue = mock(EventQueue.class); extractors = mock(ClassExtractors.class); dmaap = mock(DmaapManager.class); controller = mock(ListeningController.class); sched = mock(ScheduledThreadPoolExecutor.class); drools = mock(DroolsController.class); - when(factory.makeEventQueue(any())).thenReturn(eventQueue); when(factory.makeClassExtractors(any())).thenReturn(extractors); when(factory.makeDmaapManager(any())).thenReturn(dmaap); when(factory.makeScheduler()).thenReturn(sched); @@ -304,24 +300,25 @@ public class PoolingManagerImplTest { @Test public void testBeforeStop() throws Exception { startMgr(); - mgr.startDistributing(makeAssignments(true)); + mgr.startDistributing(makeAssignments(false)); - // verify that this message is not queued Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID); mgr.handle(msg); - verify(eventQueue, never()).add(msg); + verify(dmaap, times(START_PUB+1)).publish(any()); mgr.beforeStop(); verify(dmaap).stopConsumer(mgr); verify(sched).shutdownNow(); + verify(dmaap, times(START_PUB+2)).publish(any()); verify(dmaap).publish(contains("offline")); assertTrue(mgr.getCurrent() instanceof IdleState); - // verify that next message is queued - mgr.handle(msg); - verify(eventQueue).add(msg); + // verify that next message is handled locally + mgr.handle(msg); + verify(dmaap, times(START_PUB+2)).publish(any()); + verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT); } @Test @@ -360,26 +357,8 @@ public class PoolingManagerImplTest { startMgr(); mgr.beforeStop(); - when(eventQueue.isEmpty()).thenReturn(false); - when(eventQueue.size()).thenReturn(3); - - mgr.afterStop(); - - verify(eventQueue).clear(); - verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); - } - - @Test - public void testAfterStop_EmptyQueue() throws Exception { - startMgr(); - mgr.beforeStop(); - - when(eventQueue.isEmpty()).thenReturn(true); - when(eventQueue.size()).thenReturn(0); - mgr.afterStop(); - verify(eventQueue, never()).clear(); verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); } @@ -499,18 +478,6 @@ public class PoolingManagerImplTest { } @Test - public void testInternalTopicFailed() throws Exception { - startMgr(); - - CountDownLatch latch = mgr.internalTopicFailed(); - - // wait for the thread to complete - assertTrue(latch.await(2, TimeUnit.SECONDS)); - - verify(controller).stop(); - } - - @Test public void testSchedule() throws Exception { // must start the scheduler startMgr(); @@ -693,14 +660,7 @@ public class PoolingManagerImplTest { public void testBeforeInsert_NoIntercept() throws Exception { startMgr(); - long tbegin = System.currentTimeMillis(); - - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class); - verify(eventQueue).add(msgCap.capture()); - - validateMessageContent(tbegin, msgCap.getValue()); + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); } @Test @@ -726,37 +686,20 @@ public class PoolingManagerImplTest { startMgr(); assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT)); - - // should not have tried to enqueue a message - verify(eventQueue, never()).add(any()); } @Test public void testHandleExternalCommInfrastructureStringStringString() throws Exception { startMgr(); - long tbegin = System.currentTimeMillis(); - - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class); - verify(eventQueue).add(msgCap.capture()); - - validateMessageContent(tbegin, msgCap.getValue()); + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); } @Test public void testHandleExternalForward_NoAssignments() throws Exception { startMgr(); - long tbegin = System.currentTimeMillis(); - - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class); - verify(eventQueue).add(msgCap.capture()); - - validateMessageContent(tbegin, msgCap.getValue()); + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); } @Test @@ -921,25 +864,26 @@ public class PoolingManagerImplTest { @Test public void testMakeForward() throws Exception { startMgr(); - - long tbegin = System.currentTimeMillis(); + + // route the message to another host + mgr.startDistributing(makeAssignments(false)); assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class); - verify(eventQueue).add(msgCap.capture()); - - validateMessageContent(tbegin, msgCap.getValue()); + + verify(dmaap, times(START_PUB+1)).publish(any()); } @Test public void testMakeForward_InvalidMsg() throws Exception { startMgr(); + + // route the message to another host + mgr.startDistributing(makeAssignments(false)); assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT)); - // should not have tried to enqueue a message - verify(eventQueue, never()).add(any()); + // should not have tried to publish a message + verify(dmaap, times(START_PUB)).publish(any()); } @Test @@ -1064,69 +1008,27 @@ public class PoolingManagerImplTest { startMgr(); // route the message to this host - assertNotNull(mgr.startDistributing(makeAssignments(true))); + mgr.startDistributing(makeAssignments(true)); assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - verify(eventQueue, never()).add(any()); + verify(dmaap, times(START_PUB)).publish(any()); - // null assignments should cause message to be queued - assertNull(mgr.startDistributing(null)); - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - verify(eventQueue).add(any()); + // null assignments should cause message to be processed locally + mgr.startDistributing(null); + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + verify(dmaap, times(START_PUB)).publish(any()); // route the message to this host - assertNotNull(mgr.startDistributing(makeAssignments(true))); + mgr.startDistributing(makeAssignments(true)); assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - verify(eventQueue).add(any()); + verify(dmaap, times(START_PUB)).publish(any()); // route the message to the other host - assertNotNull(mgr.startDistributing(makeAssignments(false))); + mgr.startDistributing(makeAssignments(false)); assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - verify(eventQueue).add(any()); - } - - @Test - public void testStartDistributing_EventsInQueue_ProcessLocally() throws Exception { - startMgr(); - - // put items in the queue - LinkedList<Forward> lst = new LinkedList<>(); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - - when(eventQueue.poll()).thenAnswer(args -> lst.poll()); - - // route the messages to this host - CountDownLatch latch = mgr.startDistributing(makeAssignments(true)); - assertTrue(latch.await(2, TimeUnit.SECONDS)); - - // all of the events should have been processed locally - verify(dmaap, times(START_PUB)).publish(any()); - verify(controller, times(3)).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT); - } - - @Test - public void testStartDistributing_EventsInQueue_Forward() throws Exception { - startMgr(); - - // put items in the queue - LinkedList<Forward> lst = new LinkedList<>(); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID)); - - when(eventQueue.poll()).thenAnswer(args -> lst.poll()); - - // route the messages to the OTHER host - CountDownLatch latch = mgr.startDistributing(makeAssignments(false)); - assertTrue(latch.await(2, TimeUnit.SECONDS)); - - // all of the events should have been forwarded - verify(dmaap, times(4)).publish(any()); - verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT); + verify(dmaap, times(START_PUB+1)).publish(any()); } @Test @@ -1228,22 +1130,6 @@ public class PoolingManagerImplTest { } /** - * Validates the message content. - * - * @param tbegin creation time stamp must be no less than this - * @param msg message to be validated - */ - private void validateMessageContent(long tbegin, Forward msg) { - assertEquals(0, msg.getNumHops()); - assertTrue(msg.getCreateTimeMs() >= tbegin); - assertEquals(mgr.getHost(), msg.getSource()); - assertEquals(CommInfrastructure.UEB, msg.getProtocol()); - assertEquals(TOPIC2, msg.getTopic()); - assertEquals(THE_EVENT, msg.getPayload()); - assertEquals(REQUEST_ID, msg.getRequestId()); - } - - /** * Configure the mock controller to act like a real controller, invoking beforeOffer * and then beforeInsert, so we can make sure they pass through. We'll keep count to * ensure we don't get into infinite recursion. diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java index 27284dcd..f2701038 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java @@ -394,8 +394,8 @@ public class ActiveStateTest extends BasicStateTester { // fire the task - should transition assertEquals(next, task.third().fire()); - // should stop distributing - verify(mgr).startDistributing(null); + // should continue to distribute + verify(mgr, never()).startDistributing(null); // should publish an offline message Offline msg = captureAdminMessage(Offline.class); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java index 80778ed4..7cd37581 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java @@ -257,8 +257,8 @@ public class QueryStateTest extends BasicStateTester { assertEquals(next, timer.second().fire()); - // should stop distributing - verify(mgr).startDistributing(null); + // should continue distributing + verify(mgr, never()).startDistributing(null); Offline msg = captureAdminMessage(Offline.class); assertEquals(MY_HOST, msg.getSource()); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java index 01f49b55..18f12ff8 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java @@ -111,7 +111,7 @@ public class StartStateTest extends BasicStateTester { assertEquals(next, checker.second().fire()); - verify(mgr).internalTopicFailed(); + verify(mgr).startDistributing(null); } @Test diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java index cdf9b59a..e3d383d9 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java @@ -423,7 +423,8 @@ public class StateTest extends BasicStateTester { State next2 = state.missedHeartbeat(); assertEquals(next, next2); - verify(mgr).startDistributing(null); + // should continue to distribute + verify(mgr, never()).startDistributing(null); Offline msg = captureAdminMessage(Offline.class); assertEquals(MY_HOST, msg.getSource()); @@ -437,7 +438,8 @@ public class StateTest extends BasicStateTester { State next2 = state.internalTopicFailed(); assertEquals(next, next2); - verify(mgr).internalTopicFailed(); + // should stop distributing + verify(mgr).startDistributing(null); Offline msg = captureAdminMessage(Offline.class); assertEquals(MY_HOST, msg.getSource()); |