diff options
Diffstat (limited to 'feature-pooling-dmaap/src')
10 files changed, 103 insertions, 40 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index 2bec4579..1e2071ab 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; @@ -67,7 +68,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Maps a controller name to its associated manager. */ - private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107); + private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107); + + /** + * Decremented each time a manager enters the Active state. Used by junit tests. + */ + private final CountDownLatch activeLatch = new CountDownLatch(1); /** * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is @@ -102,6 +108,13 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF return host; } + /** + * @return a latch that will be decremented when a manager enters the active state + */ + protected CountDownLatch getActiveLatch() { + return activeLatch; + } + @Override public int getSequenceNumber() { return 0; @@ -135,7 +148,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF PoolingProperties props = new PoolingProperties(name, featProps); logger.info("pooling enabled for {}", name); - ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props)); + ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch)); } catch (PropertyException e) { logger.error("pooling disabled due to exception for {}", name, e); @@ -386,10 +399,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF * @param host name/uuid of this host * @param controller * @param props properties to use to configure the manager + * @param activeLatch decremented when the manager goes Active * @return a new pooling manager */ - public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props) { - return new PoolingManagerImpl(host, controller, props); + public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { + return new PoolingManagerImpl(host, controller, props, activeLatch); } /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index d2312469..de25e471 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -94,6 +94,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final TopicListener listener; /** + * Decremented each time the manager enters the Active state. Used by junit tests. + */ + private final CountDownLatch activeLatch; + + /** * Used to encode & decode request objects received from & sent to a rule engine. */ private final Serializer serializer; @@ -142,7 +147,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Queue used when no bucket assignments are available. */ - private EventQueue eventq; + private final EventQueue eventq; /** * {@code True} if events offered by the controller should be intercepted, @@ -156,11 +161,15 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param host name/uuid of this host * @param controller controller with which this is associated * @param props feature properties specific to the controller + * @param activeLatch latch to be decremented each time the manager enters the Active + * state */ - public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) { + public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { this.host = host; this.controller = controller; this.props = props; + this.activeLatch = activeLatch; try { this.listener = (TopicListener) controller; @@ -237,13 +246,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @return DMaaP properties */ private Properties makeDmaapProps(PolicyController controller, Properties source) { - SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source); + SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source); // could be UEB or DMAAP, so add both - addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); - addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); + addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); + addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - return props; + return specProps; } /** @@ -255,7 +264,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { */ private void addDmaapConsumerProps(SpecProperties props, String prefix) { String fullpfx = props.getSpecPrefix() + prefix + "." + topic; - + props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host); props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0"); } @@ -429,12 +438,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); // wrap the future in a "CancellableScheduledTask" - return new CancellableScheduledTask() { - @Override - public void cancel() { - fut.cancel(false); - } - }; + return () -> fut.cancel(false); } @Override @@ -444,12 +448,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { TimeUnit.MILLISECONDS); // wrap the future in a "CancellableScheduledTask" - return new CancellableScheduledTask() { - @Override - public void cancel() { - fut.cancel(false); - } - }; + return () -> fut.cancel(false); } @Override @@ -633,7 +632,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { topic); } else { - logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); + logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); event.bumpNumHops(); publish(target, event); } @@ -829,6 +828,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { @Override public State goActive() { + activeLatch.countDown(); return new ActiveState(this); } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java index b62ea0a7..c831f706 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java @@ -107,11 +107,11 @@ public class SpecProperties extends Properties { @Override public final int hashCode() { - throw new UnsupportedOperationException("HostBucket cannot be hashed"); + throw new UnsupportedOperationException("SpecProperties cannot be hashed"); } @Override public final boolean equals(Object obj) { - throw new UnsupportedOperationException("cannot compare HostBuckets"); + throw new UnsupportedOperationException("cannot compare SpecProperties"); } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java index da044252..f717aa52 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java @@ -45,7 +45,7 @@ public class InactiveState extends State { @Override public void start() { super.start(); - schedule(getProperties().getReactivateMs(), () -> goStart()); + schedule(getProperties().getReactivateMs(), this::goStart); } @Override diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index 54e93230..545c2ef0 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -122,7 +122,7 @@ public abstract class State { protected State goActive(BucketAssignments asgn) { startDistributing(asgn); - if (asgn.hasAssignment(getHost())) { + if (asgn != null && asgn.hasAssignment(getHost())) { return mgr.goActive(); } else { diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java index 29dc15e4..a5688df6 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.LinkedList; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -264,11 +265,15 @@ public class DmaapManagerTest { long minms = 2000L; // tell the publisher to stop in minms + additional time - Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L)); + CountDownLatch latch = new CountDownLatch(1); + Thread thread = new Thread(() -> { + latch.countDown(); + mgr.stopPublisher(minms + 3000L); + }); thread.start(); - // give the thread a chance to start - Thread.sleep(50L); + // wait for the thread to start + latch.await(); // interrupt it - it should immediately finish its work thread.interrupt(); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java index 84449e75..6884bec8 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java @@ -189,8 +189,7 @@ public class FeatureTest2 { } ctx.startHosts(); - - ctx.awaitEvents(STD_IDENTIFICATION_MS * 2, TimeUnit.MILLISECONDS); + ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2); for (int x = 0; x < nmessages; ++x) { ctx.offerExternal(makeMessage(x)); @@ -414,6 +413,21 @@ public class FeatureTest2 { public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException { return eventCounter.await(time, units); } + + /** + * Waits, for a period of time, for all hosts to enter the Active state. + * + * @param timeMs maximum time to wait, in milliseconds + * @throws InterruptedException + */ + public void awaitAllActive(long timeMs) throws InterruptedException { + long tend = timeMs + System.currentTimeMillis(); + + for (Host host : hosts) { + long tremain = Math.max(0, tend - System.currentTimeMillis()); + assertTrue(host.awaitActive(tremain)); + } + } } /** @@ -459,6 +473,18 @@ public class FeatureTest2 { } /** + * Waits, for a period of time, for the host to enter the Active state. + * + * @param timeMs time to wait, in milliseconds + * @return {@code true} if the host entered the Active state within the given + * amount of time, {@code false} otherwise + * @throws InterruptedException + */ + public boolean awaitActive(long timeMs) throws InterruptedException { + return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS); + } + + /** * Starts threads for the host so that it begins consuming from both the external * "DMaaP" topic and its own internal "DMaaP" topic. */ diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java index 9ee2d976..f8f37559 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java @@ -123,7 +123,7 @@ public class PoolingFeatureTest { when(factory.getController(drools2)).thenReturn(controller2); when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled); - when(factory.makeManager(any(), any(), any())).thenAnswer(args -> { + when(factory.makeManager(any(), any(), any(), any())).thenAnswer(args -> { PoolingProperties props = args.getArgument(2); PoolingManagerImpl mgr = mock(PoolingManagerImpl.class); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index 693cb6de..e0024b79 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -113,6 +113,7 @@ public class PoolingManagerImplTest { private DroolsController drools; private Serializer ser; private Factory factory; + private CountDownLatch active; private PoolingManagerImpl mgr; @@ -142,6 +143,7 @@ public class PoolingManagerImplTest { futures = new LinkedList<>(); ser = new Serializer(); + active = new CountDownLatch(1); factory = mock(Factory.class); eventQueue = mock(EventQueue.class); @@ -181,7 +183,7 @@ public class PoolingManagerImplTest { PoolingManagerImpl.setFactory(factory); - mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps); + mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps, active); } @Test @@ -204,7 +206,7 @@ public class PoolingManagerImplTest { PolicyController ctlr = mock(PolicyController.class); PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class, - () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps)); + () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps, active)); assertNotNull(ex.getCause()); assertTrue(ex.getCause() instanceof ClassCastException); } @@ -216,7 +218,7 @@ public class PoolingManagerImplTest { when(factory.makeDmaapManager(any(), any())).thenThrow(ex); PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class, - () -> new PoolingManagerImpl(MY_HOST, controller, poolProps)); + () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active)); assertEquals(ex, ex2.getCause()); } @@ -233,7 +235,7 @@ public class PoolingManagerImplTest { public void testGetHost() { assertEquals(MY_HOST, mgr.getHost()); - mgr = new PoolingManagerImpl(HOST2, controller, poolProps); + mgr = new PoolingManagerImpl(HOST2, controller, poolProps, active); assertEquals(HOST2, mgr.getHost()); } @@ -1102,7 +1104,6 @@ public class PoolingManagerImplTest { // route the messages to this host CountDownLatch latch = mgr.startDistributing(makeAssignments(true)); - assertNotNull(latch); assertTrue(latch.await(2, TimeUnit.SECONDS)); // all of the events should have been processed locally @@ -1123,7 +1124,8 @@ public class PoolingManagerImplTest { when(eventQueue.poll()).thenAnswer(args -> lst.poll()); // route the messages to the OTHER host - assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS)); + CountDownLatch latch = mgr.startDistributing(makeAssignments(false)); + assertTrue(latch.await(2, TimeUnit.SECONDS)); // all of the events should have been forwarded verify(dmaap, times(4)).publish(any()); @@ -1159,6 +1161,7 @@ public class PoolingManagerImplTest { assertTrue(st instanceof ActiveState); assertEquals(mgr.getHost(), st.getHost()); assertEquals(asgn, mgr.getAssignments()); + assertEquals(0, active.getCount()); } @Test @@ -1166,6 +1169,7 @@ public class PoolingManagerImplTest { State st = mgr.goInactive(); assertTrue(st instanceof InactiveState); assertEquals(mgr.getHost(), st.getHost()); + assertEquals(1, active.getCount()); } @Test diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java index 08b55c6b..47624aa0 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java @@ -186,6 +186,19 @@ public class StateTest extends BasicStateTester { } @Test + public void testGoActive_NullAssignment() { + State act = mock(State.class); + State inact = mock(State.class); + + when(mgr.goActive()).thenReturn(act); + when(mgr.goInactive()).thenReturn(inact); + + assertEquals(inact, state.goActive(null)); + + verify(mgr, never()).startDistributing(any()); + } + + @Test public void testGoInactive() { State next = mock(State.class); when(mgr.goInactive()).thenReturn(next); |