diff options
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap')
19 files changed, 1750 insertions, 498 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java index f68f2395..a91671fd 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java @@ -21,6 +21,8 @@ package org.onap.policy.drools.pooling; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; @@ -211,11 +213,11 @@ public class DmaapManagerTest { expectException("startPublisher,start", xxx -> mgr.startPublisher()); expectException("startPublisher,publish", xxx -> mgr.publish(MSG)); - + // allow it to succeed this time reset(sink); when(sink.send(any())).thenReturn(true); - + mgr.startPublisher(); verify(sink).start(); @@ -227,29 +229,64 @@ public class DmaapManagerTest { @Test public void testStopPublisher() throws PoolingFeatureException { // not publishing yet, so stopping should have no effect - mgr.stopPublisher(); + mgr.stopPublisher(0); verify(sink, never()).stop(); - + // now start it mgr.startPublisher(); - + // this time, stop should do something - mgr.stopPublisher(); + mgr.stopPublisher(0); verify(sink).stop(); - + // re-stopping should have no effect - mgr.stopPublisher(); + mgr.stopPublisher(0); verify(sink).stop(); } @Test + public void testStopPublisher_WithDelay() throws PoolingFeatureException { + + mgr.startPublisher(); + + long tbeg = System.currentTimeMillis(); + + mgr.stopPublisher(100L); + + assertTrue(System.currentTimeMillis() >= tbeg + 100L); + } + + @Test + public void testStopPublisher_WithDelayInterrupted() throws Exception { + + mgr.startPublisher(); + + long minms = 2000L; + + // tell the publisher to stop in minms + additional time + Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L)); + thread.start(); + + // give the thread a chance to start + Thread.sleep(50L); + + // interrupt it - it should immediately finish its work + thread.interrupt(); + + // wait for it to stop, but only wait the minimum time + thread.join(minms); + + assertFalse(thread.isAlive()); + } + + @Test public void testStopPublisher_Exception() throws PoolingFeatureException { mgr.startPublisher(); - + // force exception when it stops doThrow(new IllegalStateException("expected")).when(sink).stop(); - mgr.stopPublisher(); + mgr.stopPublisher(0); } @Test @@ -270,14 +307,14 @@ public class DmaapManagerTest { // not consuming yet, so stopping should have no effect mgr.stopConsumer(listener); verify(source, never()).unregister(any()); - + // now start it mgr.startConsumer(listener); - + // this time, stop should do something mgr.stopConsumer(listener); verify(source).unregister(listener); - + // re-stopping should have no effect mgr.stopConsumer(listener); verify(source).unregister(listener); @@ -292,7 +329,7 @@ public class DmaapManagerTest { public void testSetFilter_Exception() throws PoolingFeatureException { // force an error when setFilter() is called doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any()); - + mgr.setFilter(FILTER); } @@ -300,41 +337,41 @@ public class DmaapManagerTest { public void testPublish() throws PoolingFeatureException { // cannot publish before starting expectException("publish,pre", xxx -> mgr.publish(MSG)); - + mgr.startPublisher(); - + // publish several messages mgr.publish(MSG); verify(sink).send(MSG); - - mgr.publish(MSG+"a"); - verify(sink).send(MSG+"a"); - - mgr.publish(MSG+"b"); - verify(sink).send(MSG+"b"); - + + mgr.publish(MSG + "a"); + verify(sink).send(MSG + "a"); + + mgr.publish(MSG + "b"); + verify(sink).send(MSG + "b"); + // stop and verify we can no longer publish - mgr.stopPublisher(); + mgr.stopPublisher(0); expectException("publish,stopped", xxx -> mgr.publish(MSG)); } @Test(expected = PoolingFeatureException.class) public void testPublish_SendFailed() throws PoolingFeatureException { mgr.startPublisher(); - + // arrange for send() to fail when(sink.send(MSG)).thenReturn(false); - + mgr.publish(MSG); } @Test(expected = PoolingFeatureException.class) public void testPublish_SendEx() throws PoolingFeatureException { mgr.startPublisher(); - + // arrange for send() to throw an exception doThrow(new IllegalStateException("expected")).when(sink).send(MSG); - + mgr.publish(MSG); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java deleted file mode 100644 index 5f918f73..00000000 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.generalize; -import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize; -import java.util.Properties; -import org.junit.Test; - -public class FeatureEnabledCheckerTest { - - private static final String PROP_NAME = "enable.{?.}it"; - - private static final String SPEC = "my.specializer"; - - @Test - public void test() { - assertFalse(check(null, null)); - assertTrue(check(null, true)); - assertFalse(check(null, false)); - - assertTrue(check(true, null)); - assertTrue(check(true, true)); - assertFalse(check(true, false)); - - assertFalse(check(false, null)); - assertTrue(check(false, true)); - assertFalse(check(false, false)); - } - - /** - * Adds properties, as specified, and checks if the feature is enabled. - * - * @param wantGen value to assign to the generalized property, or - * {@code null} to leave it unset - * @param wantSpec value to assign to the specialized property, or - * {@code null} to leave it unset - * @return {@code true} if the feature is enabled, {@code false} otherwise - */ - public boolean check(Boolean wantGen, Boolean wantSpec) { - Properties props = new Properties(); - - if (wantGen != null) { - props.setProperty(generalize(PROP_NAME), wantGen.toString()); - } - - if (wantSpec != null) { - props.setProperty(specialize(PROP_NAME, SPEC), wantSpec.toString()); - } - - return FeatureEnabledChecker.isFeatureEnabled(props, SPEC, PROP_NAME); - } - -} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java new file mode 100644 index 00000000..13d70f52 --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java @@ -0,0 +1,1158 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.Arrays; +import java.util.Deque; +import java.util.IdentityHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.event.comm.FilterableTopicSource; +import org.onap.policy.drools.event.comm.Topic; +import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; +import org.onap.policy.drools.event.comm.TopicListener; +import org.onap.policy.drools.event.comm.TopicSink; +import org.onap.policy.drools.event.comm.TopicSource; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having + * its own feature object. + */ +public class FeatureTest { + + private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class); + + /** + * Name of the topic used for inter-host communication. + */ + private static final String INTERNAL_TOPIC = "my.internal.topic"; + + /** + * Name of the topic from which "external" events "arrive". + */ + private static final String EXTERNAL_TOPIC = "my.external.topic"; + + /** + * Name of the controller. + */ + private static final String CONTROLLER1 = "controller.one"; + + // private static final long STD_HEARTBEAT_WAIT_MS = 100; + // private static final long STD_REACTIVATE_WAIT_MS = 200; + // private static final long STD_IDENTIFICATION_MS = 60; + // private static final long STD_ACTIVE_HEARTBEAT_MS = 5; + // private static final long STD_INTER_HEARTBEAT_MS = 50; + // private static final long STD_OFFLINE_PUB_WAIT_MS = 2; + // private static final long POLL_MS = 2; + // private static final long INTER_POLL_MS = 2; + // private static final long EVENT_WAIT_SEC = 5; + + // use these to slow things down + private static final long STD_HEARTBEAT_WAIT_MS = 5000; + private static final long STD_REACTIVATE_WAIT_MS = 10000; + private static final long STD_IDENTIFICATION_MS = 10000; + private static final long STD_ACTIVE_HEARTBEAT_MS = 5000; + private static final long STD_INTER_HEARTBEAT_MS = 12000; + private static final long STD_OFFLINE_PUB_WAIT_MS = 2; + private static final long POLL_MS = 2; + private static final long INTER_POLL_MS = 2000; + private static final long EVENT_WAIT_SEC = 1000; + + // these are saved and restored on exit from this test class + private static PoolingFeature.Factory saveFeatureFactory; + private static PoolingManagerImpl.Factory saveManagerFactory; + private static DmaapManager.Factory saveDmaapFactory; + + /** + * Context for the current test case. + */ + private Context ctx; + + @BeforeClass + public static void setUpBeforeClass() { + saveFeatureFactory = PoolingFeature.getFactory(); + saveManagerFactory = PoolingManagerImpl.getFactory(); + saveDmaapFactory = DmaapManager.getFactory(); + } + + @AfterClass + public static void tearDownAfterClass() { + PoolingFeature.setFactory(saveFeatureFactory); + PoolingManagerImpl.setFactory(saveManagerFactory); + DmaapManager.setFactory(saveDmaapFactory); + } + + @Before + public void setUp() { + ctx = null; + } + + @After + public void tearDown() { + if (ctx != null) { + ctx.destroy(); + } + } + + @Ignore + @Test + public void test_SingleHost() throws Exception { + int nmessages = 70; + + ctx = new Context(nmessages); + + ctx.addHost(); + ctx.startHosts(); + + for (int x = 0; x < nmessages; ++x) { + ctx.offerExternal(makeMessage(x)); + } + + ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS); + + assertEquals(0, ctx.getDecodeErrors()); + assertEquals(0, ctx.getRemainingEvents()); + ctx.checkAllSawAMsg(); + } + + @Ignore + @Test + public void test_TwoHosts() throws Exception { + int nmessages = 200; + + ctx = new Context(nmessages); + + ctx.addHost(); + ctx.addHost(); + ctx.startHosts(); + + for (int x = 0; x < nmessages; ++x) { + ctx.offerExternal(makeMessage(x)); + } + + // wait for all hosts to have time to process a few messages + Thread.sleep(STD_ACTIVE_HEARTBEAT_MS + INTER_POLL_MS * 3); + + // pause a topic for a bit +// ctx.pauseTopic(); + + // now we'll see if it recovers + + ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS); + + assertEquals(0, ctx.getDecodeErrors()); + assertEquals(0, ctx.getRemainingEvents()); + ctx.checkAllSawAMsg(); + } + + private String makeMessage(int reqnum) { + return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}"; + } + + /** + * Context used for a single test case. + */ + private static class Context { + + private final FeatureFactory featureFactory; + private final ManagerFactory managerFactory; + private final DmaapFactory dmaapFactory; + + /** + * Hosts that have been added to this context. + */ + private final Deque<Host> hosts = new LinkedList<>(); + + /** + * Maps a drools controller to its policy controller. + */ + private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>(); + + /** + * Maps a channel to its queue. Does <i>not</i> include the "admin" channel. + */ + private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7); + + /** + * Queue for the external "DMaaP" topic. + */ + private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>(); + + /** + * Counts the number of decode errors. + */ + private final AtomicInteger nDecodeErrors = new AtomicInteger(0); + + /** + * Number of events we're still waiting to receive. + */ + private final CountDownLatch eventCounter; + + /** + * Maps host name to its topic source. This must be in sorted order so we can + * identify the source for the host with the higher name. + */ + private TreeMap<String, TopicSourceImpl> host2topic = new TreeMap<>(); + + /** + * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by + * {@link #getCurrentHost()}. + */ + private Host currentHost = null; + + /** + * + * @param nEvents number of events to be processed + */ + public Context(int nEvents) { + featureFactory = new FeatureFactory(this); + managerFactory = new ManagerFactory(this); + dmaapFactory = new DmaapFactory(this); + eventCounter = new CountDownLatch(nEvents); + + PoolingFeature.setFactory(featureFactory); + PoolingManagerImpl.setFactory(managerFactory); + DmaapManager.setFactory(dmaapFactory); + } + + /** + * Destroys the context, stopping any hosts that remain. + */ + public void destroy() { + stopHosts(); + hosts.clear(); + } + + /** + * Creates and adds a new host to the context. + */ + public void addHost() { + hosts.add(new Host(this)); + } + + /** + * Starts the hosts. + */ + public void startHosts() { + hosts.forEach(host -> host.start()); + } + + /** + * Stops the hosts. + */ + public void stopHosts() { + hosts.forEach(host -> host.stop()); + } + + /** + * Verifies that all hosts processed at least one message. + */ + public void checkAllSawAMsg() { + int x = 0; + for (Host host : hosts) { + assertTrue("x=" + x, host.messageSeen()); + ++x; + } + } + + /** + * Sets {@link #currentHost} to the specified host, and then invokes the given + * function. Resets {@link #currentHost} to {@code null} before returning. + * + * @param host + * @param func function to invoke + */ + public void withHost(Host host, VoidFunction func) { + currentHost = host; + func.apply(); + currentHost = null; + } + + /** + * Offers an event to the external topic. + * + * @param event + */ + public void offerExternal(String event) { + externalTopic.offer(event); + } + + /** + * Adds an internal channel to the set of channels. + * + * @param channel + * @param queue the channel's queue + */ + public void addInternal(String channel, BlockingQueue<String> queue) { + channel2queue.put(channel, queue); + } + + /** + * Offers a message to all internal channels. + * + * @param message + */ + public void offerInternal(String message) { + channel2queue.values().forEach(queue -> queue.offer(message)); + } + + /** + * Offers amessage to an internal channel. + * + * @param channel + * @param message + */ + public void offerInternal(String channel, String message) { + BlockingQueue<String> queue = channel2queue.get(channel); + if (queue != null) { + queue.offer(message); + } + } + + /** + * Decodes an event. + * + * @param event + * @return the decoded event, or {@code null} if it cannot be decoded + */ + public Object decodeEvent(String event) { + return managerFactory.decodeEvent(null, null, event); + } + + /** + * Associates a controller with its drools controller. + * + * @param controller + * @param droolsController + */ + public void addController(PolicyController controller, DroolsController droolsController) { + drools2policy.put(droolsController, controller); + } + + /** + * @param droolsController + * @return the controller associated with a drools controller, or {@code null} if + * it has no associated controller + */ + public PolicyController getController(DroolsController droolsController) { + return drools2policy.get(droolsController); + } + + /** + * @return queue for the external topic + */ + public BlockingQueue<String> getExternalTopic() { + return externalTopic; + } + + /** + * + * @return the number of decode errors so far + */ + public int getDecodeErrors() { + return nDecodeErrors.get(); + } + + /** + * Increments the count of decode errors. + */ + public void bumpDecodeErrors() { + nDecodeErrors.incrementAndGet(); + } + + /** + * + * @return the number of events that haven't been processed + */ + public long getRemainingEvents() { + return eventCounter.getCount(); + } + + /** + * Adds an event to the counter. + */ + public void addEvent() { + eventCounter.countDown(); + } + + /** + * Waits, for a period of time, for all events to be processed. + * + * @param time + * @param units + * @return {@code true} if all events have been processed, {@code false} otherwise + * @throws InterruptedException + */ + public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException { + return eventCounter.await(time, units); + } + + /** + * Associates a host with a topic. + * + * @param host + * @param topic + */ + public void addTopicSource(String host, TopicSourceImpl topic) { + host2topic.put(host, topic); + } + + /** + * Pauses the last topic source long enough to miss a heart beat. + */ + public void pauseTopic() { + Entry<String, TopicSourceImpl> ent = host2topic.lastEntry(); + if (ent != null) { + ent.getValue().pause(STD_ACTIVE_HEARTBEAT_MS); + } + } + + /** + * Gets the current host, provided this is used from within a call to + * {@link #withHost(Host, VoidFunction)}. + * + * @return the current host, or {@code null} if there is no current host + */ + public Host getCurrentHost() { + return currentHost; + } + } + + /** + * Simulates a single "host". + */ + private static class Host { + + private final Context context; + + private final PoolingFeature feature = new PoolingFeature(); + + /** + * {@code True} if this host has processed a message, {@code false} otherwise. + */ + private final AtomicBoolean sawMsg = new AtomicBoolean(false); + + /** + * This host's internal "DMaaP" topic. + */ + private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(); + + /** + * Source that reads from the external topic and posts to the listener. + */ + private TopicSource externalSource; + + // mock objects + private final PolicyEngine engine = mock(PolicyEngine.class); + private final ListenerController controller = mock(ListenerController.class); + private final DroolsController drools = mock(DroolsController.class); + + /** + * + * @param context + */ + public Host(Context context) { + this.context = context; + + when(controller.getName()).thenReturn(CONTROLLER1); + when(controller.getDrools()).thenReturn(drools); + + // stop consuming events if the controller stops + when(controller.stop()).thenAnswer(args -> { + externalSource.unregister(controller); + return true; + }); + + doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any()); + + context.addController(controller, drools); + + // arrange to read from the external topic + externalSource = new TopicSourceImpl(context, false); + } + + /** + * Gets the host name. This should only be invoked within {@link #start()}. + * + * @return the host name + */ + public String getName() { + return PoolingManagerImpl.getLastHost(); + } + + /** + * Starts threads for the host so that it begins consuming from both the external + * "DMaaP" topic and its own internal "DMaaP" topic. + */ + public void start() { + + context.withHost(this, () -> { + + feature.beforeStart(engine); + feature.afterCreate(controller); + + // assign the queue for this host's internal topic + context.addInternal(getName(), msgQueue); + + feature.beforeStart(controller); + + // start consuming events from the external topic + externalSource.register(controller); + + feature.afterStart(controller); + }); + } + + /** + * Stops the host's threads. + */ + public void stop() { + feature.beforeStop(controller); + externalSource.unregister(controller); + feature.afterStop(controller); + } + + /** + * Offers an event to the feature, before the policy controller handles it. + * + * @param protocol + * @param topic2 + * @param event + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { + return feature.beforeOffer(controller, protocol, topic2, event); + } + + /** + * Offers an event to the feature, after the policy controller handles it. + * + * @param protocol + * @param topic + * @param event + * @param success + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) { + + return feature.afterOffer(controller, protocol, topic, event, success); + } + + /** + * Offers an event to the feature, before the drools controller handles it. + * + * @param fact + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean beforeInsert(Object fact) { + return feature.beforeInsert(drools, fact); + } + + /** + * Offers an event to the feature, after the drools controller handles it. + * + * @param fact + * @param successInsert {@code true} if it was successfully inserted by the drools + * controller, {@code false} otherwise + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean afterInsert(Object fact, boolean successInsert) { + return feature.afterInsert(drools, fact, successInsert); + } + + /** + * Indicates that a message was seen for this host. + */ + public void sawMessage() { + sawMsg.set(true); + } + + /** + * + * @return {@code true} if a message was seen for this host, {@code false} + * otherwise + */ + public boolean messageSeen() { + return sawMsg.get(); + } + + /** + * @return the queue associated with this host's internal topic + */ + public BlockingQueue<String> getInternalQueue() { + return msgQueue; + } + } + + /** + * Listener for the external topic. Simulates the actions taken by + * <i>AggregatedPolicyController.onTopicEvent</i>. + */ + private static class MyExternalTopicListener implements Answer<Void> { + + private final Context context; + private final Host host; + + public MyExternalTopicListener(Context context, Host host) { + this.context = context; + this.host = host; + } + + @Override + public Void answer(InvocationOnMock args) throws Throwable { + int i = 0; + CommInfrastructure commType = args.getArgument(i++); + String topic = args.getArgument(i++); + String event = args.getArgument(i++); + + if (host.beforeOffer(commType, topic, event)) { + return null; + } + + boolean result; + Object fact = context.decodeEvent(event); + + if (fact == null) { + result = false; + context.bumpDecodeErrors(); + + } else { + result = true; + + if (!host.beforeInsert(fact)) { + // feature did not handle it so we handle it here + host.afterInsert(fact, result); + + host.sawMessage(); + context.addEvent(); + } + } + + host.afterOffer(commType, topic, event, result); + return null; + } + } + + /** + * Sink implementation that puts a message on the queue specified by the + * <i>channel</i> embedded within the message. If it's the "admin" channel, then the + * message is placed on all queues. + */ + private static class TopicSinkImpl extends TopicImpl implements TopicSink { + + private final Context context; + + /** + * Used to decode the messages so that the channel can be extracted. + */ + private final Serializer serializer = new Serializer(); + + /** + * + * @param context + */ + public TopicSinkImpl(Context context) { + this.context = context; + } + + @Override + public synchronized boolean send(String message) { + if (!isAlive()) { + return false; + } + + try { + Message msg = serializer.decodeMsg(message); + String channel = msg.getChannel(); + + if (Message.ADMIN.equals(channel)) { + // add to every queue + context.offerInternal(message); + + } else { + // add to a specific queue + context.offerInternal(channel, message); + } + + return true; + + } catch (IOException e) { + logger.warn("could not decode message: {}", message); + context.bumpDecodeErrors(); + return false; + } + } + } + + /** + * Source implementation that reads from a queue associated with a topic. + */ + private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource { + + private final String topic; + + /** + * Queue from which to retrieve messages. + */ + private final BlockingQueue<String> queue; + + /** + * Manages the current consumer thread. The "first" item is used as a trigger to + * tell the thread to stop processing, while the "second" item is triggered <i>by + * the thread</i> when it completes. + */ + private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null); + + /** + * Time, in milliseconds, to pause before polling for more messages. + */ + private AtomicLong pauseTimeMs = new AtomicLong(0); + + /** + * + * @param context + * @param internal {@code true} if to read from the internal topic, {@code false} + * to read from the external topic + */ + public TopicSourceImpl(Context context, boolean internal) { + if (internal) { + Host host = context.getCurrentHost(); + + this.topic = INTERNAL_TOPIC; + this.queue = host.getInternalQueue(); + + context.addTopicSource(host.getName(), this); + + } else { + this.topic = EXTERNAL_TOPIC; + this.queue = context.getExternalTopic(); + } + } + + @Override + public void setFilter(String filter) { + logger.info("topic filter set to: {}", filter); + } + + @Override + public String getTopic() { + return topic; + } + + @Override + public boolean offer(String event) { + throw new UnsupportedOperationException("offer topic source"); + } + + /** + * Starts a thread that takes messages from the queue and gives them to the + * listener. Stops the thread of any previously registered listener. + */ + @Override + public void register(TopicListener listener) { + Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1)); + + reregister(newPair); + + new Thread(() -> { + try { + do { + processMessages(newPair.first(), listener); + } while (!newPair.first().await(INTER_POLL_MS, TimeUnit.MILLISECONDS)); + + logger.info("topic source thread completed"); + + } catch (InterruptedException e) { + logger.warn("topic source thread aborted", e); + Thread.currentThread().interrupt(); + + } catch (RuntimeException e) { + logger.warn("topic source thread aborted", e); + } + + newPair.second().countDown(); + + }).start(); + } + + /** + * Stops the thread of <i>any</i> currently registered listener. + */ + @Override + public void unregister(TopicListener listener) { + reregister(null); + } + + /** + * Registers a new "pair" with this source, stopping the consumer associated with + * any previous registration. + * + * @param newPair the new "pair", or {@code null} to unregister + */ + private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) { + try { + Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair); + if (oldPair == null) { + if (newPair == null) { + // unregister was invoked twice in a row + logger.warn("re-unregister for topic source"); + } + + // no previous thread to stop + return; + } + + // need to stop the previous thread + + // tell it to stop + oldPair.first().countDown(); + + // wait for it to stop + if (!oldPair.second().await(2, TimeUnit.SECONDS)) { + logger.warn("old topic registration is still running"); + } + + } catch (InterruptedException e) { + logger.warn("old topic registration may still be running", e); + Thread.currentThread().interrupt(); + } + + if (newPair != null) { + // register was invoked twice in a row + logger.warn("re-register for topic source"); + } + } + + /** + * Indicates that {@link #processMessages(CountDownLatch, TopicListener)} should + * pause a bit. + * + * @param timeMs time, in milliseconds, to pause + */ + public void pause(long timeMs) { + pauseTimeMs.set(timeMs); + } + + /** + * Polls for messages from the topic and offers them to the listener. If + * {@link #pauseTimeMs} is non-zero, then it pauses for the specified time and + * then immediately returns. + * + * @param stopped triggered if processing should stop + * @param listener + * @throws InterruptedException + */ + private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException { + + for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) { + + long ptm = pauseTimeMs.getAndSet(0); + if (ptm != 0) { + logger.warn("pause processing"); + stopped.await(ptm, TimeUnit.MILLISECONDS); + return; + } + + String msg = queue.poll(POLL_MS, TimeUnit.MILLISECONDS); + if (msg == null) { + return; + } + + listener.onTopicEvent(CommInfrastructure.UEB, topic, msg); + } + } + } + + /** + * Topic implementation. Most methods just throw + * {@link UnsupportedOperationException}. + */ + private static class TopicImpl implements Topic { + + /** + * {@code True} if this topic is alive/running, {@code false} otherwise. + */ + private boolean alive = false; + + /** + * + */ + public TopicImpl() { + super(); + } + + @Override + public String getTopic() { + return INTERNAL_TOPIC; + } + + @Override + public CommInfrastructure getTopicCommInfrastructure() { + throw new UnsupportedOperationException("topic protocol"); + } + + @Override + public List<String> getServers() { + throw new UnsupportedOperationException("topic servers"); + } + + @Override + public String[] getRecentEvents() { + throw new UnsupportedOperationException("topic events"); + } + + @Override + public void register(TopicListener topicListener) { + throw new UnsupportedOperationException("register topic"); + } + + @Override + public void unregister(TopicListener topicListener) { + throw new UnsupportedOperationException("unregister topic"); + } + + @Override + public synchronized boolean start() { + if (alive) { + throw new IllegalStateException("topic already started"); + } + + alive = true; + return true; + } + + @Override + public synchronized boolean stop() { + if (!alive) { + throw new IllegalStateException("topic is not running"); + } + + alive = false; + return true; + } + + @Override + public synchronized void shutdown() { + alive = false; + } + + @Override + public synchronized boolean isAlive() { + return alive; + } + + @Override + public boolean lock() { + throw new UnsupportedOperationException("lock topicink"); + } + + @Override + public boolean unlock() { + throw new UnsupportedOperationException("unlock topic"); + } + + @Override + public boolean isLocked() { + throw new UnsupportedOperationException("topic isLocked"); + } + } + + /** + * Simulator for the feature-level factory. + */ + private static class FeatureFactory extends PoolingFeature.Factory { + + private final Context context; + + /** + * + * @param context + */ + public FeatureFactory(Context context) { + this.context = context; + + /* + * Note: do NOT extract anything from "context" at this point, because it + * hasn't been fully initialized yet + */ + } + + @Override + public Properties getProperties(String featName) { + Properties props = new Properties(); + + props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}"); + + props.setProperty("pooling." + CONTROLLER1 + ".topic", INTERNAL_TOPIC); + props.setProperty("pooling." + CONTROLLER1 + ".enabled", "true"); + props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.limit", "10000"); + props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.age.milliseconds", "1000000"); + props.setProperty("pooling." + CONTROLLER1 + ".start.heartbeat.milliseconds", "" + STD_HEARTBEAT_WAIT_MS); + props.setProperty("pooling." + CONTROLLER1 + ".reactivate.milliseconds", "" + STD_REACTIVATE_WAIT_MS); + props.setProperty("pooling." + CONTROLLER1 + ".identification.milliseconds", "" + STD_IDENTIFICATION_MS); + props.setProperty("pooling." + CONTROLLER1 + ".active.heartbeat.milliseconds", + "" + STD_ACTIVE_HEARTBEAT_MS); + props.setProperty("pooling." + CONTROLLER1 + ".inter.heartbeat.milliseconds", "" + STD_INTER_HEARTBEAT_MS); + props.setProperty("pooling." + CONTROLLER1 + ".offline.publish.wait.milliseconds", + "" + STD_OFFLINE_PUB_WAIT_MS); + + return props; + } + + @Override + public PolicyController getController(DroolsController droolsController) { + return context.getController(droolsController); + } + } + + /** + * Simulator for the pooling manager factory. + */ + private static class ManagerFactory extends PoolingManagerImpl.Factory { + + /** + * Used to decode events from the external topic. + */ + private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() { + @Override + protected ObjectMapper initialValue() { + return new ObjectMapper(); + } + }; + + /** + * Used to decode events into a Map. + */ + private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {}; + + /** + * + * @param context + */ + public ManagerFactory(Context context) { + + /* + * Note: do NOT extract anything from "context" at this point, because it + * hasn't been fully initialized yet + */ + } + + @Override + public boolean canDecodeEvent(DroolsController drools, String topic) { + return true; + } + + @Override + public Object decodeEvent(DroolsController drools, String topic, String event) { + try { + return mapper.get().readValue(event, typeRef); + + } catch (IOException e) { + logger.warn("cannot decode external event", e); + return null; + } + } + } + + /** + * Simulator for the dmaap manager factory. + */ + private static class DmaapFactory extends DmaapManager.Factory { + + private final Context context; + + /** + * + * @param context + */ + public DmaapFactory(Context context) { + this.context = context; + + /* + * Note: do NOT extract anything from "context" at this point, because it + * hasn't been fully initialized yet + */ + } + + @Override + public List<TopicSource> initTopicSources(Properties props) { + return Arrays.asList(new TopicSourceImpl(context, true)); + } + + @Override + public List<TopicSink> initTopicSinks(Properties props) { + return Arrays.asList(new TopicSinkImpl(context)); + } + } + + /** + * Controller that also implements the {@link TopicListener} interface. + */ + private static interface ListenerController extends PolicyController, TopicListener { + + } + + /** + * Simple function that takes no arguments and returns nothing. + */ + @FunctionalInterface + private static interface VoidFunction { + + public void apply(); + } +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java index 5b423d4b..34b604c9 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java @@ -31,12 +31,4 @@ public class PoolingFeatureExceptionTest extends ExceptionsTester { assertEquals(5, test(PoolingFeatureException.class)); } - @Test - public void testToRuntimeException() { - PoolingFeatureException plainExc = new PoolingFeatureException("hello"); - PoolingFeatureRtException runtimeExc = plainExc.toRuntimeException(); - - assertTrue(plainExc == runtimeExc.getCause()); - } - } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java index cd1aea09..7782e475 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java @@ -23,7 +23,7 @@ package org.onap.policy.drools.pooling; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -40,12 +41,11 @@ import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.pooling.PoolingFeature.Factory; import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyEngine; import org.onap.policy.drools.utils.Pair; public class PoolingFeatureTest { - private static final String CONFIG_DIR = "src/test/java/org/onap/policy/drools/pooling"; - private static final String CONTROLLER1 = "controllerA"; private static final String CONTROLLER2 = "controllerB"; private static final String CONTROLLER_DISABLED = "controllerDisabled"; @@ -66,6 +66,8 @@ public class PoolingFeatureTest { */ private static Factory saveFactory; + private Properties props; + private PolicyEngine engine; private PolicyController controller1; private PolicyController controller2; private PolicyController controllerDisabled; @@ -94,6 +96,8 @@ public class PoolingFeatureTest { @Before public void setUp() throws Exception { + props = initProperties(); + engine = mock(PolicyEngine.class); factory = mock(Factory.class); controller1 = mock(PolicyController.class); controller2 = mock(PolicyController.class); @@ -113,12 +117,13 @@ public class PoolingFeatureTest { when(controllerException.getName()).thenReturn(CONTROLLER_EX); when(controllerUnknown.getName()).thenReturn(CONTROLLER_UNKNOWN); + when(factory.getProperties(PoolingProperties.FEATURE_NAME)).thenReturn(props); when(factory.getController(drools1)).thenReturn(controller1); when(factory.getController(drools2)).thenReturn(controller2); when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled); when(factory.makeManager(any(), any())).thenAnswer(args -> { - PoolingProperties props = args.getArgumentAt(1, PoolingProperties.class); + PoolingProperties props = args.getArgument(1); PoolingManagerImpl mgr = mock(PoolingManagerImpl.class); @@ -129,7 +134,7 @@ public class PoolingFeatureTest { pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); pool.afterCreate(controller1); pool.afterCreate(controller2); @@ -149,24 +154,17 @@ public class PoolingFeatureTest { } @Test - public void testGlobalInit() { - pool = new PoolingFeature(); - - pool.globalInit(null, CONFIG_DIR); - } - - @Test(expected = PoolingFeatureRtException.class) - public void testGlobalInit_NotFound() { + public void testBeforeStartEngine() { pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR + "/unknown"); + assertFalse(pool.beforeStart(engine)); } @Test public void testAfterCreate() { managers.clear(); pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); assertFalse(pool.afterCreate(controller1)); assertEquals(1, managers.size()); @@ -184,7 +182,7 @@ public class PoolingFeatureTest { public void testAfterCreate_NotEnabled() { managers.clear(); pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); assertFalse(pool.afterCreate(controllerDisabled)); assertTrue(managers.isEmpty()); @@ -194,7 +192,7 @@ public class PoolingFeatureTest { public void testAfterCreate_PropertyEx() { managers.clear(); pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); pool.afterCreate(controllerException); } @@ -202,9 +200,9 @@ public class PoolingFeatureTest { @Test(expected = PoolingFeatureRtException.class) public void testAfterCreate_NoProps() { pool = new PoolingFeature(); - + // did not perform globalInit, which is an error - + pool.afterCreate(controller1); } @@ -212,7 +210,7 @@ public class PoolingFeatureTest { public void testAfterCreate_NoFeatProps() { managers.clear(); pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); assertFalse(pool.afterCreate(controllerUnknown)); assertTrue(managers.isEmpty()); @@ -492,4 +490,31 @@ public class PoolingFeatureTest { pool.afterStop(controller1); } + private Properties initProperties() { + Properties props = new Properties(); + + initProperties(props, "A", 0); + initProperties(props, "B", 1); + initProperties(props, "Exception", 2); + + props.setProperty("pooling.controllerDisabled.enabled", "false"); + + props.setProperty("pooling.controllerException.offline.queue.limit", "INVALID NUMBER"); + + return props; + } + + private void initProperties(Properties props, String suffix, int offset) { + props.setProperty("pooling.controller" + suffix + ".topic", "topic." + suffix); + props.setProperty("pooling.controller" + suffix + ".enabled", "true"); + props.setProperty("pooling.controller" + suffix + ".offline.queue.limit", String.valueOf(5 + offset)); + props.setProperty("pooling.controller" + suffix + ".offline.queue.age.milliseconds", + String.valueOf(100 + offset)); + props.setProperty("pooling.controller" + suffix + ".start.heartbeat.milliseconds", String.valueOf(10 + offset)); + props.setProperty("pooling.controller" + suffix + ".reactivate.milliseconds", String.valueOf(20 + offset)); + props.setProperty("pooling.controller" + suffix + ".identification.milliseconds", String.valueOf(30 + offset)); + props.setProperty("pooling.controller" + suffix + ".active.heartbeat.milliseconds", + String.valueOf(40 + offset)); + props.setProperty("pooling.controller" + suffix + ".inter.heartbeat.milliseconds", String.valueOf(50 + offset)); + } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index 01ee61ef..e32fa545 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -25,7 +25,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -40,7 +41,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -71,6 +71,7 @@ public class PoolingManagerImplTest { protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1; protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1; protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1; + protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1; private static final String HOST2 = "other.host"; @@ -85,8 +86,8 @@ public class PoolingManagerImplTest { private static final String REQUEST_ID = "my.request.id"; /** - * Number of dmaap.publish() invocations that should be issued when the - * manager is started. + * Number of dmaap.publish() invocations that should be issued when the manager is + * started. */ private static final int START_PUB = 1; @@ -135,6 +136,7 @@ public class PoolingManagerImplTest { when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS); when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS); when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS); + when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS); futures = new LinkedList<>(); ser = new Serializer(); @@ -180,11 +182,6 @@ public class PoolingManagerImplTest { mgr = new PoolingManagerImpl(controller, poolProps); } - @After - public void tearDown() throws Exception { - - } - @Test public void testPoolingManagerImpl() { mgr = new PoolingManagerImpl(controller, poolProps); @@ -199,8 +196,8 @@ public class PoolingManagerImplTest { @Test public void testPoolingManagerImpl_ClassEx() { /* - * this controller does not implement TopicListener, which should cause - * a ClassCastException + * this controller does not implement TopicListener, which should cause a + * ClassCastException */ PolicyController ctlr = mock(PolicyController.class); @@ -316,6 +313,7 @@ public class PoolingManagerImplTest { verify(dmaap).stopConsumer(mgr); verify(sched).shutdownNow(); + verify(dmaap).publish(contains("offline")); assertTrue(mgr.getCurrent() instanceof IdleState); } @@ -362,7 +360,7 @@ public class PoolingManagerImplTest { mgr.afterStop(); verify(eventQueue).clear(); - verify(dmaap).stopPublisher(); + verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); } @Test @@ -376,7 +374,7 @@ public class PoolingManagerImplTest { mgr.afterStop(); verify(eventQueue, never()).clear(); - verify(dmaap).stopPublisher(); + verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); } @Test @@ -511,7 +509,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -539,7 +537,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -608,8 +606,7 @@ public class PoolingManagerImplTest { StartState st = (StartState) mgr.getCurrent(); /* - * give it its heart beat, that should cause it to transition to the - * Query state. + * give it its heart beat, that should cause it to transition to the Query state. */ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs()); hb.setChannel(Message.ADMIN); @@ -985,14 +982,35 @@ public class PoolingManagerImplTest { } @Test + public void testInject_Ex() throws Exception { + startMgr(); + + // route the message to this host + mgr.startDistributing(makeAssignments(true)); + + // generate RuntimeException when onTopicEvent() is invoked + doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any()); + + CountDownLatch latch = catchRecursion(true); + + Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID); + mgr.handle(msg); + + verify(dmaap, times(START_PUB)).publish(any()); + verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT); + + // ensure we made it past both beforeXxx() methods + assertEquals(0, latch.getCount()); + } + + @Test public void testHandleInternal() throws Exception { startMgr(); StartState st = (StartState) mgr.getCurrent(); /* - * give it its heart beat, that should cause it to transition to the - * Query state. + * give it its heart beat, that should cause it to transition to the Query state. */ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs()); hb.setChannel(Message.ADMIN); @@ -1022,8 +1040,8 @@ public class PoolingManagerImplTest { Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs()); /* - * do NOT set the channel - this will cause the message to be invalid, - * triggering an exception + * do NOT set the channel - this will cause the message to be invalid, triggering + * an exception */ String msg = ser.encodeMsg(hb); @@ -1068,7 +1086,7 @@ public class PoolingManagerImplTest { when(eventQueue.poll()).thenAnswer(args -> lst.poll()); // route the messages to this host - mgr.startDistributing(makeAssignments(true)); + assertTrue(mgr.startDistributing(makeAssignments(true)).await(2, TimeUnit.SECONDS)); // all of the events should have been processed locally verify(dmaap, times(START_PUB)).publish(any()); @@ -1088,7 +1106,7 @@ public class PoolingManagerImplTest { when(eventQueue.poll()).thenAnswer(args -> lst.poll()); // route the messages to the OTHER host - mgr.startDistributing(makeAssignments(false)); + assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS)); // all of the events should have been forwarded verify(dmaap, times(4)).publish(any()); @@ -1140,7 +1158,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -1163,7 +1181,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -1208,15 +1226,14 @@ public class PoolingManagerImplTest { } /** - * Configure the mock controller to act like a real controller, invoking - * beforeOffer and then beforeInsert, so we can make sure they pass through. - * We'll keep count to ensure we don't get into infinite recursion. + * Configure the mock controller to act like a real controller, invoking beforeOffer + * and then beforeInsert, so we can make sure they pass through. We'll keep count to + * ensure we don't get into infinite recursion. * - * @param invokeBeforeInsert {@code true} if beforeInsert() should be - * invoked, {@code false} if it should be skipped + * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked, + * {@code false} if it should be skipped * - * @return a latch that will be counted down if both beforeXxx() methods - * return false + * @return a latch that will be counted down if both beforeXxx() methods return false */ private CountDownLatch catchRecursion(boolean invokeBeforeInsert) { CountDownLatch recursion = new CountDownLatch(3); @@ -1230,9 +1247,9 @@ public class PoolingManagerImplTest { } int iarg = 0; - CommInfrastructure proto = args.getArgumentAt(iarg++, CommInfrastructure.class); - String topic = args.getArgumentAt(iarg++, String.class); - String event = args.getArgumentAt(iarg++, String.class); + CommInfrastructure proto = args.getArgument(iarg++); + String topic = args.getArgument(iarg++); + String event = args.getArgument(iarg++); if (mgr.beforeOffer(proto, topic, event)) { return null; @@ -1253,9 +1270,8 @@ public class PoolingManagerImplTest { /** * Makes an assignment with two buckets. * - * @param sameHost {@code true} if the {@link #REQUEST_ID} should has to the - * manager's bucket, {@code false} if it should hash to the other - * host's bucket + * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the + * manager's bucket, {@code false} if it should hash to the other host's bucket * @return a new bucket assignment */ private BucketAssignments makeAssignments(boolean sameHost) { diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java index 63eb59d4..2d734c1c 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java @@ -30,6 +30,7 @@ import static org.onap.policy.drools.pooling.PoolingProperties.IDENTIFICATION_MS import static org.onap.policy.drools.pooling.PoolingProperties.INTER_HEARTBEAT_MS; import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_AGE_MS; import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_LIMIT; +import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_PUB_WAIT_MS; import static org.onap.policy.drools.pooling.PoolingProperties.POOLING_TOPIC; import static org.onap.policy.drools.pooling.PoolingProperties.REACTIVATE_MS; import static org.onap.policy.drools.pooling.PoolingProperties.START_HEARTBEAT_MS; @@ -53,6 +54,7 @@ public class PoolingPropertiesTest { public static final long STD_LEADER_MS = 5000L; public static final long STD_ACTIVE_HEARTBEAT_MS = 6000L; public static final long STD_INTER_HEARTBEAT_MS = 7000L; + public static final long STD_OFFLINE_PUB_WAIT_MS = 8000L; private Properties plain; private PoolingProperties pooling; @@ -121,10 +123,15 @@ public class PoolingPropertiesTest { doTest(INTER_HEARTBEAT_MS, STD_INTER_HEARTBEAT_MS, 15000L, xxx -> pooling.getInterHeartbeatMs()); } + @Test + public void testGetOfflinePubWaitMs() throws PropertyException { + doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs()); + } + /** - * Tests a particular property. Verifies that the correct value is returned - * if the specialized property has a value or the property has no value. - * Also verifies that the property name can be generalized. + * Tests a particular property. Verifies that the correct value is returned if the + * specialized property has a value or the property has no value. Also verifies that + * the property name can be generalized. * * @param propnm name of the property of interest * @param specValue expected specialized value @@ -140,8 +147,8 @@ public class PoolingPropertiesTest { assertEquals("special " + propnm, specValue, func.apply(null)); /* - * Ensure the property supports generalization - this will throw an - * exception if it does not. + * Ensure the property supports generalization - this will throw an exception if + * it does not. */ assertFalse(propnm.equals(generalize(propnm))); @@ -155,8 +162,8 @@ public class PoolingPropertiesTest { } /** - * Makes a set of properties, where all of the properties are specialized - * for the controller. + * Makes a set of properties, where all of the properties are specialized for the + * controller. * * @return a new property set */ @@ -172,6 +179,7 @@ public class PoolingPropertiesTest { props.setProperty(specialize(IDENTIFICATION_MS, CONTROLLER), "" + STD_IDENTIFICATION_MS); props.setProperty(specialize(ACTIVE_HEARTBEAT_MS, CONTROLLER), "" + STD_ACTIVE_HEARTBEAT_MS); props.setProperty(specialize(INTER_HEARTBEAT_MS, CONTROLLER), "" + STD_INTER_HEARTBEAT_MS); + props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS); return props; } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java index 8b495099..505dc400 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java @@ -49,8 +49,8 @@ public class SpecPropertiesTest { private static final String PREFIX_SPEC = PREFIX_GEN + MY_SPEC + "."; /** - * Suffix to add to property names to generate names of properties that are - * not populated. + * Suffix to add to property names to generate names of properties that are not + * populated. */ private static final String SUFFIX = ".suffix"; @@ -175,6 +175,16 @@ public class SpecPropertiesTest { assertNull(props.getProperty(gen(PROP_UNKNOWN), null)); } + @Test(expected = UnsupportedOperationException.class) + public void testHashCode() { + props.hashCode(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testEquals() { + props.equals(props); + } + private String gen(String propnm) { if (propnm.startsWith(PREFIX_SPEC)) { return PREFIX_GEN + propnm.substring(PREFIX_SPEC.length()); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties index a4b5bc76..3273a21e 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties @@ -1,3 +1,16 @@ +# Copyright 2018 AT&T Intellectual Property. All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. pooling.controllerA.topic = topic.A pooling.controllerA.enabled = true diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java index ef03d4d6..c14e8dba 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; import org.junit.Test; @@ -196,16 +197,21 @@ public class BucketAssignmentsTest { String[] arr = {"def", "abc", "def", "ghi", "def", "def", "xyz"}; asgn.setHostArray(arr); - for (int x = 0; x < arr.length; ++x) { - assertEquals("x=" + x, arr[x], asgn.getAssignedHost(x)); + /* + * get assignments for consecutive integers, including negative numbers and + * numbers extending past the length of the array. + * + */ + TreeSet<String> seen = new TreeSet<>(); + for (int x = -1; x < arr.length + 2; ++x) { + seen.add(asgn.getAssignedHost(x)); } - // negative - assertNull(asgn.getAssignedHost(-1)); + TreeSet<String> expected = new TreeSet<>(Arrays.asList(arr)); + assertEquals(expected, seen); - // beyond end - assertNull(asgn.getAssignedHost(arr.length)); - assertNull(asgn.getAssignedHost(arr.length + 1)); + // try a much bigger number + assertNotNull(asgn.getAssignedHost(arr.length * 1000)); } @Test diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java deleted file mode 100644 index 428b5853..00000000 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling.message; - -import org.junit.Test; -import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; -import com.fasterxml.jackson.databind.ObjectMapper; - -public class Trial { - - @Test - public void test() throws Exception { - ObjectMapper mapper = new ObjectMapper(); - - Message msg = new Forward("me", CommInfrastructure.DMAAP, "my topic", "a message", "my req"); - - String enc = mapper.writeValueAsString(msg); - System.out.println("enc=" + enc); - - Message msg2 = mapper.readValue(enc, Message.class); - System.out.println("class=" + msg2.getClass()); - } - -} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java index 7997a4ee..7b4b0602 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java @@ -24,9 +24,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -39,11 +39,12 @@ import org.junit.Before; import org.junit.Test; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Heartbeat; -import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; +import org.onap.policy.drools.utils.Pair; +import org.onap.policy.drools.utils.Triple; public class ActiveStateTest extends BasicStateTester { @@ -65,7 +66,7 @@ public class ActiveStateTest extends BasicStateTester { // ensure a heart beat was generated Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(MY_HOST, msg.second().getSource()); } @Test @@ -183,8 +184,8 @@ public class ActiveStateTest extends BasicStateTester { /* * - * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader - * thus should be ignored. + * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader thus + * should be ignored. */ assertNull(state.process(new Offline(PREV_HOST2))); } @@ -196,22 +197,57 @@ public class ActiveStateTest extends BasicStateTester { state = new ActiveState(mgr); /* - * HOST1 has buckets, but it isn't the leader and it isn't my - * predecessor, thus should be ignored. + * HOST1 has buckets, but it isn't the leader and it isn't my predecessor, thus + * should be ignored. */ assertNull(state.process(new Offline(HOST1))); } @Test - public void testProcessQuery() { + public void testProcessLeader_Invalid() { + Leader msg = new Leader(PREV_HOST, null); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + + // info should be unchanged + assertEquals(MY_HOST, state.getLeader()); + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testProcessLeader_BadLeader() { + String[] arr = {HOST2, HOST1}; + BucketAssignments asgn = new BucketAssignments(arr); + + // now send a Leader message for that leader + Leader msg = new Leader(HOST1, asgn); + State next = mock(State.class); when(mgr.goQuery()).thenReturn(next); - assertEquals(next, state.process(new Query())); + // should go Query, but not start distributing + assertEquals(next, state.process(msg)); + verify(mgr, never()).startDistributing(asgn); + } + + @Test + public void testProcessLeader_GoodLeader() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + + // now send a Leader message for that leader + Leader msg = new Leader(PREV_HOST, asgn); + + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); - Identification ident = captureAdminMessage(Identification.class); - assertEquals(MY_HOST, ident.getSource()); - assertEquals(ASGN3, ident.getAssignments()); + // should go Active and start distributing + assertEquals(next, state.process(msg)); + verify(mgr).startDistributing(asgn); } @Test @@ -256,24 +292,24 @@ public class ActiveStateTest extends BasicStateTester { // invoke start() to add the timers state.start(); - assertEquals(3, repeatedFutures.size()); + assertEquals(3, repeatedSchedules.size()); Triple<Long, Long, StateTimerTask> timer; // heart beat generator timer = repeatedTasks.remove(); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue()); // my heart beat checker timer = repeatedTasks.remove(); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); // predecessor's heart beat checker timer = repeatedTasks.remove(); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); } @Test @@ -285,19 +321,19 @@ public class ActiveStateTest extends BasicStateTester { // invoke start() to add the timers state.start(); - assertEquals(2, repeatedFutures.size()); + assertEquals(2, repeatedSchedules.size()); Triple<Long, Long, StateTimerTask> timer; // heart beat generator timer = repeatedTasks.remove(); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue()); // my heart beat checker timer = repeatedTasks.remove(); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); } @Test @@ -314,14 +350,14 @@ public class ActiveStateTest extends BasicStateTester { verify(mgr).publish(anyString(), any(Heartbeat.class)); // fire the task - assertNull(task.third.fire(null)); + assertNull(task.third().fire()); // should have generated a second pair of heart beats verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class)); Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.first); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(MY_HOST, msg.first()); + assertEquals(MY_HOST, msg.second().getSource()); } @Test @@ -339,7 +375,7 @@ public class ActiveStateTest extends BasicStateTester { when(mgr.goInactive()).thenReturn(next); // fire the task - should not transition - assertNull(task.third.fire(null)); + assertNull(task.third().fire()); verify(mgr, never()).publishAdmin(any(Query.class)); } @@ -356,7 +392,7 @@ public class ActiveStateTest extends BasicStateTester { when(mgr.goInactive()).thenReturn(next); // fire the task - should transition - assertEquals(next, task.third.fire(null)); + assertEquals(next, task.third().fire()); // should indicate failure verify(mgr).internalTopicFailed(); @@ -381,7 +417,7 @@ public class ActiveStateTest extends BasicStateTester { when(mgr.goQuery()).thenReturn(next); // fire the task - should NOT transition - assertNull(task.third.fire(null)); + assertNull(task.third().fire()); verify(mgr, never()).publishAdmin(any(Query.class)); } @@ -398,7 +434,7 @@ public class ActiveStateTest extends BasicStateTester { when(mgr.goQuery()).thenReturn(next); // fire the task - should transition - assertEquals(next, task.third.fire(null)); + assertEquals(next, task.third().fire()); verify(mgr).publishAdmin(any(Query.class)); } @@ -414,8 +450,8 @@ public class ActiveStateTest extends BasicStateTester { verify(mgr, times(1)).publish(any(), any()); Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.first); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(MY_HOST, msg.first()); + assertEquals(MY_HOST, msg.second().getSource()); } @Test @@ -429,13 +465,13 @@ public class ActiveStateTest extends BasicStateTester { // this message should go to itself msg = capturePublishedMessage(Heartbeat.class, index++); - assertEquals(MY_HOST, msg.first); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(MY_HOST, msg.first()); + assertEquals(MY_HOST, msg.second().getSource()); // this message should go to its successor msg = capturePublishedMessage(Heartbeat.class, index++); - assertEquals(HOST1, msg.first); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(HOST1, msg.first()); + assertEquals(MY_HOST, msg.second().getSource()); } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java index e48742f7..75ca7564 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java @@ -21,9 +21,9 @@ package org.onap.policy.drools.pooling.state; import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,16 +33,18 @@ import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicReference; +import org.onap.policy.drools.pooling.CancellableScheduledTask; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.PoolingProperties; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.utils.Pair; +import org.onap.policy.drools.utils.Triple; /** - * Superclass used to test subclasses of {@link Message}. + * Superclass used to test subclasses of {@link State}. */ public class BasicStateTester { @@ -74,9 +76,9 @@ public class BasicStateTester { protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3); /** - * Futures returned by schedule(). + * Scheduled tasks returned by schedule(). */ - protected LinkedList<ScheduledFuture<?>> onceFutures; + protected LinkedList<CancellableScheduledTask> onceSchedules; /** * Tasks captured via schedule(). @@ -84,9 +86,9 @@ public class BasicStateTester { protected LinkedList<Pair<Long, StateTimerTask>> onceTasks; /** - * Futures returned by scheduleWithFixedDelay(). + * Scheduled tasks returned by scheduleWithFixedDelay(). */ - protected LinkedList<ScheduledFuture<?>> repeatedFutures; + protected LinkedList<CancellableScheduledTask> repeatedSchedules; /** * Tasks captured via scheduleWithFixedDelay(). @@ -112,10 +114,10 @@ public class BasicStateTester { } public void setUp() throws Exception { - onceFutures = new LinkedList<>(); + onceSchedules = new LinkedList<>(); onceTasks = new LinkedList<>(); - repeatedFutures = new LinkedList<>(); + repeatedSchedules = new LinkedList<>(); repeatedTasks = new LinkedList<>(); published = new LinkedList<>(); @@ -162,9 +164,9 @@ public class BasicStateTester { Object[] args = invocation.getArguments(); onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1])); - ScheduledFuture<?> fut = mock(ScheduledFuture.class); - onceFutures.add(fut); - return fut; + CancellableScheduledTask sched = mock(CancellableScheduledTask.class); + onceSchedules.add(sched); + return sched; }); // capture scheduleWithFixedDelay() arguments, and return a new future @@ -172,9 +174,9 @@ public class BasicStateTester { Object[] args = invocation.getArguments(); repeatedTasks.add(new Triple<>((Long) args[0], (Long) args[1], (StateTimerTask) args[2])); - ScheduledFuture<?> fut = mock(ScheduledFuture.class); - repeatedFutures.add(fut); - return fut; + CancellableScheduledTask sched = mock(CancellableScheduledTask.class); + repeatedSchedules.add(sched); + return sched; }); // get/set assignments in the manager @@ -183,7 +185,7 @@ public class BasicStateTester { when(mgr.getAssignments()).thenAnswer(args -> asgn.get()); doAnswer(args -> { - asgn.set(args.getArgumentAt(0, BucketAssignments.class)); + asgn.set(args.getArgument(0)); return null; }).when(mgr).startDistributing(any()); } @@ -199,8 +201,7 @@ public class BasicStateTester { } /** - * Captures the host array from the Leader message published to the admin - * channel. + * Captures the host array from the Leader message published to the admin channel. * * @return the host array, as a list */ @@ -209,8 +210,7 @@ public class BasicStateTester { } /** - * Captures the host array from the Leader message published to the admin - * channel. + * Captures the host array from the Leader message published to the admin channel. * * @return the host array */ @@ -224,8 +224,7 @@ public class BasicStateTester { } /** - * Captures the assignments from the Leader message published to the admin - * channel. + * Captures the assignments from the Leader message published to the admin channel. * * @return the bucket assignments */ @@ -277,42 +276,6 @@ public class BasicStateTester { */ protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) { Pair<String, Message> msg = published.get(index); - return new Pair<>(msg.first, clazz.cast(msg.second)); + return new Pair<>(msg.first(), clazz.cast(msg.second())); } - - /** - * Pair of values. - * - * @param <F> first value's type - * @param <S> second value's type - */ - public static class Pair<F, S> { - public final F first; - public final S second; - - public Pair(F first, S second) { - this.first = first; - this.second = second; - } - } - - /** - * Pair of values. - * - * @param <F> first value's type - * @param <S> second value's type - * @param <T> third value's type - */ - public static class Triple<F, S, T> { - public final F first; - public final S second; - public final T third; - - public Triple(F first, S second, T third) { - this.first = first; - this.second = second; - this.third = third; - } - } - } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java index 96c59719..95cbe753 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java @@ -61,12 +61,6 @@ public class IdleStateTest extends BasicStateTester { } @Test - public void testStop() { - state.stop(); - verifyNothingPublished(); - } - - @Test public void testProcessForward() { Forward msg = new Forward(); assertNull(state.process(msg)); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java index 48d5b1ed..394adaee 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java @@ -28,6 +28,7 @@ import java.util.Map; import org.junit.Before; import org.junit.Test; import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.utils.Pair; public class InactiveStateTest extends BasicStateTester { @@ -62,20 +63,20 @@ public class InactiveStateTest extends BasicStateTester { Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_REACTIVATE_WAIT_MS, timer.first.longValue()); + assertEquals(STD_REACTIVATE_WAIT_MS, timer.first().longValue()); // invoke the task - it should go to the state returned by the mgr State next = mock(State.class); when(mgr.goStart()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); } @Test public void testInactiveState() { /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java index d60ad2ea..e1718418 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -38,16 +38,19 @@ import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Query; +import org.onap.policy.drools.pooling.state.ProcessingState.HostBucket; public class ProcessingStateTest extends BasicStateTester { private ProcessingState state; + private HostBucket hostBucket; @Before public void setUp() throws Exception { super.setUp(); state = new ProcessingState(mgr, MY_HOST); + hostBucket = new HostBucket(MY_HOST); } @Test @@ -62,6 +65,39 @@ public class ProcessingStateTest extends BasicStateTester { } @Test + public void testGoActive_WithAssignment() { + State act = mock(State.class); + State inact = mock(State.class); + + when(mgr.goActive()).thenReturn(act); + when(mgr.goInactive()).thenReturn(inact); + + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + + assertEquals(act, state.goActive(asgn)); + + verify(mgr).startDistributing(asgn); + } + + @Test + public void testGoActive_WithoutAssignment() { + State act = mock(State.class); + State inact = mock(State.class); + + when(mgr.goActive()).thenReturn(act); + when(mgr.goInactive()).thenReturn(inact); + + String[] arr = {HOST2, PREV_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + + assertEquals(inact, state.goActive(asgn)); + + verify(mgr).startDistributing(asgn); + + } + + @Test public void testProcessQuery() { State next = mock(State.class); when(mgr.goQuery()).thenReturn(next); @@ -97,8 +133,8 @@ public class ProcessingStateTest extends BasicStateTester { state = new ProcessingState(mgr, LEADER); /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); @@ -260,8 +296,8 @@ public class ProcessingStateTest extends BasicStateTester { @Test public void testMakeBucketArray() { /* - * All hosts are still alive, so it should have the exact same - * assignments as it had to start. + * All hosts are still alive, so it should have the exact same assignments as it + * had to start. */ state.setAssignments(ASGN3); state.becomeLeader(sortHosts(HOST_ARR3)); @@ -325,4 +361,80 @@ public class ProcessingStateTest extends BasicStateTester { assertEquals(Arrays.asList(expected), captureHostList()); } + @Test + public void testHostBucketRemove_testHostBucketAdd_testHostBucketSize() { + assertEquals(0, hostBucket.size()); + + hostBucket.add(20); + hostBucket.add(30); + hostBucket.add(40); + assertEquals(3, hostBucket.size()); + + assertEquals(20, hostBucket.remove().intValue()); + assertEquals(30, hostBucket.remove().intValue()); + assertEquals(1, hostBucket.size()); + + // add more before taking the last item + hostBucket.add(50); + hostBucket.add(60); + assertEquals(3, hostBucket.size()); + + assertEquals(40, hostBucket.remove().intValue()); + assertEquals(50, hostBucket.remove().intValue()); + assertEquals(60, hostBucket.remove().intValue()); + assertEquals(0, hostBucket.size()); + + // add more AFTER taking the last item + hostBucket.add(70); + assertEquals(70, hostBucket.remove().intValue()); + assertEquals(0, hostBucket.size()); + } + + @Test + public void testHostBucketCompareTo() { + HostBucket hb1 = new HostBucket(PREV_HOST); + HostBucket hb2 = new HostBucket(MY_HOST); + + assertEquals(0, hb1.compareTo(hb1)); + assertEquals(0, hb1.compareTo(new HostBucket(PREV_HOST))); + + // both empty + assertTrue(hb1.compareTo(hb2) < 0); + assertTrue(hb2.compareTo(hb1) > 0); + + // hb1 has one bucket, so it should not be larger + hb1.add(100); + assertTrue(hb1.compareTo(hb2) > 0); + assertTrue(hb2.compareTo(hb1) < 0); + + // hb1 has two buckets, so it should still be larger + hb1.add(200); + assertTrue(hb1.compareTo(hb2) > 0); + assertTrue(hb2.compareTo(hb1) < 0); + + // hb1 has two buckets, hb2 has one, so hb1 should still be larger + hb2.add(1000); + assertTrue(hb1.compareTo(hb2) > 0); + assertTrue(hb2.compareTo(hb1) < 0); + + // same number of buckets, so hb2 should now be larger + hb2.add(2000); + assertTrue(hb1.compareTo(hb2) < 0); + assertTrue(hb2.compareTo(hb1) > 0); + + // hb2 has more buckets, it should still be larger + hb2.add(3000); + assertTrue(hb1.compareTo(hb2) < 0); + assertTrue(hb2.compareTo(hb1) > 0); + } + + @Test(expected = UnsupportedOperationException.class) + public void testHostBucketHashCode() { + hostBucket.hashCode(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testHostBucketEquals() { + hostBucket.equals(hostBucket); + } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java index d714d5cc..a7c3a3d5 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java @@ -25,9 +25,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Map; @@ -38,7 +39,7 @@ import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.pooling.message.Query; +import org.onap.policy.drools.utils.Pair; public class QueryStateTest extends BasicStateTester { @@ -63,44 +64,48 @@ public class QueryStateTest extends BasicStateTester { } @Test + public void testGoQuery() { + assertNull(state.goQuery()); + } + + @Test public void testStart() { state.start(); Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); - assertNotNull(timer.second); + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); } @Test - public void testGoQuery() { - assertNull(state.process(new Query())); - assertEquals(ASGN3, state.getAssignments()); - } + public void testProcessIdentification_SameSource() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); - @Test - public void testProcessIdentification_NullSource() { - assertNull(state.process(new Identification())); + assertNull(state.process(new Identification(MY_HOST, asgn))); + // info should be unchanged assertEquals(MY_HOST, state.getLeader()); + verify(mgr, never()).startDistributing(asgn); } @Test - public void testProcessIdentification_NewLeader() { - assertNull(state.process(new Identification(PREV_HOST, null))); - - assertEquals(PREV_HOST, state.getLeader()); - } + public void testProcessIdentification_DiffSource() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); - @Test - public void testProcessIdentification_NotNewLeader() { - assertNull(state.process(new Identification(HOST2, null))); + assertNull(state.process(new Identification(HOST2, asgn))); + // leader should be unchanged assertEquals(MY_HOST, state.getLeader()); + + // should have picked up the assignments + verify(mgr).startDistributing(asgn); } @Test - public void testProcessLeader_NullAssignment() { + public void testProcessLeader_Invalid() { Leader msg = new Leader(PREV_HOST, null); // should stay in the same state, and not start distributing @@ -115,67 +120,55 @@ public class QueryStateTest extends BasicStateTester { } @Test - public void testProcessLeader_NullSource() { + public void testProcessLeader_SameLeader() { String[] arr = {HOST2, PREV_HOST, MY_HOST}; BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(null, asgn); - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); - verify(mgr, never()).goInactive(); + // identify a leader that's better than my host + assertEquals(null, state.process(new Identification(PREV_HOST, asgn))); - // info should be unchanged - assertEquals(MY_HOST, state.getLeader()); - assertEquals(ASGN3, state.getAssignments()); - } + // now send a Leader message for that leader + Leader msg = new Leader(PREV_HOST, asgn); - @Test - public void testProcessLeader_SourceIsNotAssignmentLeader() { - String[] arr = {HOST2, PREV_HOST, MY_HOST}; - BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(HOST2, asgn); + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); + // should go Active and start distributing + assertEquals(next, state.process(msg)); verify(mgr, never()).goInactive(); - // info should be unchanged - assertEquals(MY_HOST, state.getLeader()); - assertEquals(ASGN3, state.getAssignments()); + // Ident msg + Leader msg = times(2) + verify(mgr, times(2)).startDistributing(asgn); } @Test - public void testProcessLeader_EmptyAssignment() { - Leader msg = new Leader(PREV_HOST, new BucketAssignments()); + public void testProcessLeader_BetterLeaderWithAssignment() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(PREV_HOST, asgn); - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); - verify(mgr, never()).goInactive(); + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); - // info should be unchanged - assertEquals(MY_HOST, state.getLeader()); - assertEquals(ASGN3, state.getAssignments()); + // should go Active and start distributing + assertEquals(next, state.process(msg)); + verify(mgr).startDistributing(asgn); + verify(mgr, never()).goInactive(); } @Test - public void testProcessLeader_BetterLeader() { - String[] arr = {HOST2, PREV_HOST, MY_HOST}; + public void testProcessLeader_BetterLeaderWithoutAssignment() { + String[] arr = {HOST2, PREV_HOST, HOST1}; BucketAssignments asgn = new BucketAssignments(arr); Leader msg = new Leader(PREV_HOST, asgn); State next = mock(State.class); - when(mgr.goActive()).thenReturn(next); + when(mgr.goInactive()).thenReturn(next); - // should go Active and start distributing + // should go Inactive, but start distributing assertEquals(next, state.process(msg)); verify(mgr).startDistributing(asgn); - verify(mgr, never()).goInactive(); + verify(mgr, never()).goActive(); } @Test @@ -241,41 +234,48 @@ public class QueryStateTest extends BasicStateTester { } @Test - public void testProcessQuery() { - BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2}); - mgr.startDistributing(asgn); - state = new QueryState(mgr); - - State next = mock(State.class); - when(mgr.goQuery()).thenReturn(next); - - assertEquals(null, state.process(new Query())); - - verify(mgr).publishAdmin(any(Identification.class)); - } - - @Test public void testQueryState() { /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } @Test + public void testAwaitIdentification_MissingSelfIdent() { + state.start(); + + Pair<Long, StateTimerTask> timer = onceTasks.remove(); + + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); + + // should published an Offline message and go inactive + + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + assertEquals(next, timer.second().fire()); + + Offline msg = captureAdminMessage(Offline.class); + assertEquals(MY_HOST, msg.getSource()); + } + + @Test public void testAwaitIdentification_Leader() { state.start(); + state.process(new Identification(MY_HOST, null)); Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); - assertNotNull(timer.second); + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); State next = mock(State.class); when(mgr.goActive()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); // should have published a Leader message Leader msg = captureAdminMessage(Leader.class); @@ -291,20 +291,21 @@ public class QueryStateTest extends BasicStateTester { state = new QueryState(mgr); state.start(); + state.process(new Identification(MY_HOST, null)); // tell it the leader is still active state.process(new Identification(PREV_HOST, asgn)); Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); - assertNotNull(timer.second); + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); // set up active state, as that's what it should return State next = mock(State.class); when(mgr.goActive()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); // should NOT have published a Leader message assertTrue(admin.isEmpty()); @@ -321,20 +322,21 @@ public class QueryStateTest extends BasicStateTester { state = new QueryState(mgr); state.start(); + state.process(new Identification(MY_HOST, null)); // tell it the leader is still active state.process(new Identification(PREV_HOST, asgn)); Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); - assertNotNull(timer.second); + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); // set up inactive state, as that's what it should return State next = mock(State.class); when(mgr.goInactive()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); // should NOT have published a Leader message assertTrue(admin.isEmpty()); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java index f29d2348..af4e8f13 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java @@ -23,7 +23,7 @@ package org.onap.policy.drools.pooling.state; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -38,6 +38,7 @@ import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; +import org.onap.policy.drools.utils.Pair; public class StartStateTest extends BasicStateTester { @@ -74,18 +75,18 @@ public class StartStateTest extends BasicStateTester { Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.first); - assertEquals(state.getHbTimestampMs(), msg.second.getTimestampMs()); + assertEquals(MY_HOST, msg.first()); + assertEquals(state.getHbTimestampMs(), msg.second().getTimestampMs()); Pair<Long, StateTimerTask> timer = onceTasks.removeFirst(); - assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first.longValue()); + assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first().longValue()); // invoke the task - it should go to the state returned by the mgr State next = mock(State.class); when(mgr.goInactive()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); verify(mgr).internalTopicFailed(); } @@ -93,8 +94,8 @@ public class StartStateTest extends BasicStateTester { @Test public void testStartStatePoolingManager() { /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } @@ -105,8 +106,8 @@ public class StartStateTest extends BasicStateTester { state = new StartState(mgr); /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java index 1be48e21..a184dfad 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java @@ -21,16 +21,18 @@ package org.onap.policy.drools.pooling.state; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.any; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Map; -import java.util.concurrent.ScheduledFuture; import org.junit.Before; import org.junit.Test; +import org.onap.policy.drools.pooling.CancellableScheduledTask; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; @@ -55,8 +57,8 @@ public class StateTest extends BasicStateTester { @Test public void testStatePoolingManager() { /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } @@ -67,8 +69,8 @@ public class StateTest extends BasicStateTester { state = new MyState(mgr); /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } @@ -98,13 +100,13 @@ public class StateTest extends BasicStateTester { verify(mgr).schedule(delay, task2); verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3); - ScheduledFuture<?> fut1 = onceFutures.removeFirst(); - ScheduledFuture<?> fut2 = onceFutures.removeFirst(); - ScheduledFuture<?> fut3 = repeatedFutures.removeFirst(); + CancellableScheduledTask sched1 = onceSchedules.removeFirst(); + CancellableScheduledTask sched2 = onceSchedules.removeFirst(); + CancellableScheduledTask sched3 = repeatedSchedules.removeFirst(); - verify(fut1, never()).cancel(false); - verify(fut2, never()).cancel(false); - verify(fut3, never()).cancel(false); + verify(sched1, never()).cancel(); + verify(sched2, never()).cancel(); + verify(sched3, never()).cancel(); /* * Cancel the timers. @@ -112,9 +114,9 @@ public class StateTest extends BasicStateTester { state.cancelTimers(); // verify that all were cancelled - verify(fut1).cancel(false); - verify(fut2).cancel(false); - verify(fut3).cancel(false); + verify(sched1).cancel(); + verify(sched2).cancel(); + verify(sched3).cancel(); } @Test @@ -134,13 +136,6 @@ public class StateTest extends BasicStateTester { } @Test - public void testStop() { - state.stop(); - - assertEquals(MY_HOST, captureAdminMessage(Offline.class).getSource()); - } - - @Test public void testGoStart() { State next = mock(State.class); when(mgr.goStart()).thenReturn(next); @@ -195,7 +190,18 @@ public class StateTest extends BasicStateTester { } @Test - public void testProcessLeader_NullAssignment() { + public void testProcessLeader() { + String[] arr = {HOST2, HOST1}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(HOST1, asgn); + + // should ignore it + assertEquals(null, state.process(msg)); + verify(mgr).startDistributing(asgn); + } + + @Test + public void testProcessLeader_Invalid() { Leader msg = new Leader(PREV_HOST, null); // should stay in the same state, and not start distributing @@ -206,57 +212,44 @@ public class StateTest extends BasicStateTester { } @Test - public void testProcessLeader_NullSource() { + public void testIsValidLeader_NullAssignment() { + assertFalse(state.isValid(new Leader(PREV_HOST, null))); + } + + @Test + public void testIsValidLeader_NullSource() { String[] arr = {HOST2, PREV_HOST, MY_HOST}; BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(null, asgn); + assertFalse(state.isValid(new Leader(null, asgn))); + } - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); - verify(mgr, never()).goInactive(); + @Test + public void testIsValidLeader_EmptyAssignment() { + assertFalse(state.isValid(new Leader(PREV_HOST, new BucketAssignments()))); } @Test - public void testProcessLeader_EmptyAssignment() { - Leader msg = new Leader(PREV_HOST, new BucketAssignments()); + public void testIsValidLeader_FromSelf() { + String[] arr = {HOST2, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); - verify(mgr, never()).goInactive(); + assertFalse(state.isValid(new Leader(MY_HOST, asgn))); } @Test - public void testProcessLeader_MyHostAssigned() { - String[] arr = {HOST2, PREV_HOST, MY_HOST}; + public void testIsValidLeader_WrongLeader() { + String[] arr = {HOST2, HOST3}; BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(PREV_HOST, asgn); - - State next = mock(State.class); - when(mgr.goActive()).thenReturn(next); - // should go Active and start distributing - assertEquals(next, state.process(msg)); - verify(mgr).startDistributing(asgn); - verify(mgr, never()).goInactive(); + assertFalse(state.isValid(new Leader(HOST1, asgn))); } @Test - public void testProcessLeader_MyHostUnassigned() { + public void testIsValidLeader() { String[] arr = {HOST2, HOST1}; BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(HOST1, asgn); - State next = mock(State.class); - when(mgr.goInactive()).thenReturn(next); - - // should go Inactive and start distributing - assertEquals(next, state.process(msg)); - verify(mgr).startDistributing(asgn); - verify(mgr, never()).goActive(); + assertTrue(state.isValid(new Leader(HOST1, asgn))); } @Test @@ -344,18 +337,18 @@ public class StateTest extends BasicStateTester { state.schedule(delay, task); - ScheduledFuture<?> fut = onceFutures.removeFirst(); + CancellableScheduledTask sched = onceSchedules.removeFirst(); // scheduled, but not canceled yet verify(mgr).schedule(delay, task); - verify(fut, never()).cancel(false); + verify(sched, never()).cancel(); /* - * Ensure the state added the timer to its list by telling it to cancel - * its timers and then seeing if this timer was canceled. + * Ensure the state added the timer to its list by telling it to cancel its timers + * and then seeing if this timer was canceled. */ state.cancelTimers(); - verify(fut).cancel(false); + verify(sched).cancel(); } @Test @@ -367,18 +360,18 @@ public class StateTest extends BasicStateTester { state.scheduleWithFixedDelay(initdel, delay, task); - ScheduledFuture<?> fut = repeatedFutures.removeFirst(); + CancellableScheduledTask sched = repeatedSchedules.removeFirst(); // scheduled, but not canceled yet verify(mgr).scheduleWithFixedDelay(initdel, delay, task); - verify(fut, never()).cancel(false); + verify(sched, never()).cancel(); /* - * Ensure the state added the timer to its list by telling it to cancel - * its timers and then seeing if this timer was canceled. + * Ensure the state added the timer to its list by telling it to cancel its timers + * and then seeing if this timer was canceled. */ state.cancelTimers(); - verify(fut).cancel(false); + verify(sched).cancel(); } @Test |