diff options
Diffstat (limited to 'feature-pooling-dmaap/src/test/java')
7 files changed, 149 insertions, 165 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java index c35e525a..96b358da 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.tuple.Pair; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -61,7 +63,6 @@ import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyEngine; -import org.onap.policy.drools.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,25 +80,21 @@ import org.slf4j.LoggerFactory; * * <p>Invoke {@link #runSlow()}, before the test, to slow things down. */ -public class FeatureTest { +public class FeatureTest { private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class); - /** * Name of the topic used for inter-host communication. */ private static final String INTERNAL_TOPIC = "my.internal.topic"; - /** * Name of the topic from which "external" events "arrive". */ private static final String EXTERNAL_TOPIC = "my.external.topic"; - /** * Name of the controller. */ private static final String CONTROLLER1 = "controller.one"; - private static long stdReactivateWaitMs = 200; private static long stdIdentificationMs = 60; private static long stdStartHeartbeatMs = 60; @@ -111,28 +108,26 @@ public class FeatureTest { * Used to decode events from the external topic. */ private static final Gson mapper = new Gson(); - /** * Used to identify the current context. */ private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>(); - /** * Context for the current test case. */ private Context ctx; - /** * Setup. */ + @Before public void setUp() { ctx = null; } - /** * Tear down. */ + @After public void tearDown() { if (ctx != null) { @@ -157,19 +152,14 @@ public class FeatureTest { private void run(int nmessages, int nhosts) throws Exception { ctx = new Context(nmessages); - for (int x = 0; x < nhosts; ++x) { ctx.addHost(); } - ctx.startHosts(); - for (int x = 0; x < nmessages; ++x) { ctx.offerExternal(makeMessage(x)); } - ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS); - assertEquals(0, ctx.getDecodeErrors()); assertEquals(0, ctx.getRemainingEvents()); ctx.checkAllSawAMsg(); @@ -178,10 +168,10 @@ public class FeatureTest { private String makeMessage(int reqnum) { return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}"; } - /** * Invoke this to slow the timers down. */ + protected static void runSlow() { stdReactivateWaitMs = 10000; stdIdentificationMs = 10000; @@ -193,58 +183,50 @@ public class FeatureTest { stdInterPollMs = 2000; stdEventWaitSec = 1000; } - /** * Decodes an event. * * @param event event * @return the decoded event, or {@code null} if it cannot be decoded */ + private static Object decodeEvent(String event) { try { return mapper.fromJson(event, TreeMap.class); - } catch (JsonParseException e) { logger.warn("cannot decode external event", e); return null; } } - /** * Context used for a single test case. */ - private static class Context { + private static class Context { /** * Hosts that have been added to this context. */ private final Deque<Host> hosts = new LinkedList<>(); - /** * Maps a drools controller to its policy controller. */ private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>(); - /** * Maps a channel to its queue. Does <i>not</i> include the "admin" channel. */ private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7); - /** * Queue for the external "DMaaP" topic. */ private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>(); - /** * Counts the number of decode errors. */ private final AtomicInteger numDecodeErrors = new AtomicInteger(0); - /** * Number of events we're still waiting to receive. */ private final CountDownLatch eventCounter; - /** * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by * {@link #getCurrentHost()}. @@ -256,13 +238,14 @@ public class FeatureTest { * * @param nEvents number of events to be processed */ + public Context(int events) { eventCounter = new CountDownLatch(events); } - /** * Destroys the context, stopping any hosts that remain. */ + public void destroy() { stopHosts(); hosts.clear(); @@ -273,16 +256,17 @@ public class FeatureTest { * * @return the new Host */ + public Host addHost() { Host host = new Host(this); hosts.add(host); - return host; } /** * Starts the hosts. */ + public void startHosts() { hosts.forEach(host -> host.start()); } @@ -290,6 +274,7 @@ public class FeatureTest { /** * Stops the hosts. */ + public void stopHosts() { hosts.forEach(host -> host.stop()); } @@ -297,6 +282,7 @@ public class FeatureTest { /** * Verifies that all hosts processed at least one message. */ + public void checkAllSawAMsg() { int msgs = 0; for (Host host : hosts) { @@ -312,6 +298,7 @@ public class FeatureTest { * @param host host * @param func function to invoke */ + public void withHost(Host host, VoidFunction func) { currentHost = host; func.apply(); @@ -323,6 +310,7 @@ public class FeatureTest { * * @param event event */ + public void offerExternal(String event) { externalTopic.offer(event); } @@ -333,6 +321,7 @@ public class FeatureTest { * @param channel channel * @param queue the channel's queue */ + public void addInternal(String channel, BlockingQueue<String> queue) { channel2queue.put(channel, queue); } @@ -342,6 +331,7 @@ public class FeatureTest { * * @param message message */ + public void offerInternal(String message) { channel2queue.values().forEach(queue -> queue.offer(message)); } @@ -352,6 +342,7 @@ public class FeatureTest { * @param channel channel * @param message message */ + public void offerInternal(String channel, String message) { BlockingQueue<String> queue = channel2queue.get(channel); if (queue != null) { @@ -365,6 +356,7 @@ public class FeatureTest { * @param controller controller * @param droolsController drools controller */ + public void addController(PolicyController controller, DroolsController droolsController) { drools2policy.put(droolsController, controller); } @@ -376,6 +368,7 @@ public class FeatureTest { * @return the controller associated with a drools controller, or {@code null} if * it has no associated controller */ + public PolicyController getController(DroolsController droolsController) { return drools2policy.get(droolsController); } @@ -385,6 +378,7 @@ public class FeatureTest { * * @return queue for the external topic */ + public BlockingQueue<String> getExternalTopic() { return externalTopic; } @@ -394,6 +388,7 @@ public class FeatureTest { * * @return the number of decode errors so far */ + public int getDecodeErrors() { return numDecodeErrors.get(); } @@ -401,6 +396,7 @@ public class FeatureTest { /** * Increments the count of decode errors. */ + public void bumpDecodeErrors() { numDecodeErrors.incrementAndGet(); } @@ -410,6 +406,7 @@ public class FeatureTest { * * @return the number of events that haven't been processed */ + public long getRemainingEvents() { return eventCounter.getCount(); } @@ -417,6 +414,7 @@ public class FeatureTest { /** * Adds an event to the counter. */ + public void addEvent() { eventCounter.countDown(); } @@ -429,6 +427,7 @@ public class FeatureTest { * @return {@code true} if all events have been processed, {@code false} otherwise * @throws InterruptedException throws interrupted */ + public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException { return eventCounter.await(time, units); } @@ -439,6 +438,7 @@ public class FeatureTest { * * @return the current host, or {@code null} if there is no current host */ + public Host getCurrentHost() { return currentHost; } @@ -447,25 +447,27 @@ public class FeatureTest { /** * Simulates a single "host". */ - private static class Host { + private static class Host { private final Context context; - private final PoolingFeature feature; /** * {@code True} if this host has processed a message, {@code false} otherwise. */ + private final AtomicBoolean sawMsg = new AtomicBoolean(false); /** * This host's internal "DMaaP" topic. */ + private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(); /** * Source that reads from the external topic and posts to the listener. */ + private TopicSource externalSource; // mock objects @@ -478,25 +480,20 @@ public class FeatureTest { * * @param context context */ + public Host(Context context) { this.context = context; - when(controller.getName()).thenReturn(CONTROLLER1); when(controller.getDrools()).thenReturn(drools); - // stop consuming events if the controller stops when(controller.stop()).thenAnswer(args -> { externalSource.unregister(controller); return true; }); - doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any()); - context.addController(controller, drools); - // arrange to read from the external topic externalSource = new TopicSourceImpl(context, false); - feature = new PoolingFeatureImpl(context); } @@ -505,6 +502,7 @@ public class FeatureTest { * * @return the host name */ + public String getName() { return feature.getHost(); } @@ -513,21 +511,16 @@ public class FeatureTest { * Starts threads for the host so that it begins consuming from both the external * "DMaaP" topic and its own internal "DMaaP" topic. */ - public void start() { + public void start() { context.withHost(this, () -> { - feature.beforeStart(engine); feature.afterCreate(controller); - // assign the queue for this host's internal topic context.addInternal(getName(), msgQueue); - feature.beforeStart(controller); - // start consuming events from the external topic externalSource.register(controller); - feature.afterStart(controller); }); } @@ -535,6 +528,7 @@ public class FeatureTest { /** * Stops the host's threads. */ + public void stop() { feature.beforeStop(controller); externalSource.unregister(controller); @@ -549,6 +543,7 @@ public class FeatureTest { * @param event event * @return {@code true} if the event was handled, {@code false} otherwise */ + public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { return feature.beforeOffer(controller, protocol, topic2, event); } @@ -562,8 +557,8 @@ public class FeatureTest { * @param success success * @return {@code true} if the event was handled, {@code false} otherwise */ - public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) { + public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) { return feature.afterOffer(controller, protocol, topic, event, success); } @@ -573,6 +568,7 @@ public class FeatureTest { * @param fact fact * @return {@code true} if the event was handled, {@code false} otherwise */ + public boolean beforeInsert(Object fact) { return feature.beforeInsert(drools, fact); } @@ -585,6 +581,7 @@ public class FeatureTest { * controller, {@code false} otherwise * @return {@code true} if the event was handled, {@code false} otherwise */ + public boolean afterInsert(Object fact, boolean successInsert) { return feature.afterInsert(drools, fact, successInsert); } @@ -592,6 +589,7 @@ public class FeatureTest { /** * Indicates that a message was seen for this host. */ + public void sawMessage() { sawMsg.set(true); } @@ -602,6 +600,7 @@ public class FeatureTest { * @return {@code true} if a message was seen for this host, {@code false} * otherwise */ + public boolean messageSeen() { return sawMsg.get(); } @@ -611,6 +610,7 @@ public class FeatureTest { * * @return the queue associated with this host's internal topic */ + public BlockingQueue<String> getInternalQueue() { return msgQueue; } @@ -620,8 +620,8 @@ public class FeatureTest { * Listener for the external topic. Simulates the actions taken by * <i>AggregatedPolicyController.onTopicEvent</i>. */ - private static class MyExternalTopicListener implements Answer<Void> { + private static class MyExternalTopicListener implements Answer<Void> { private final Context context; private final Host host; @@ -636,30 +636,23 @@ public class FeatureTest { CommInfrastructure commType = args.getArgument(index++); String topic = args.getArgument(index++); String event = args.getArgument(index++); - if (host.beforeOffer(commType, topic, event)) { return null; } - boolean result; Object fact = decodeEvent(event); - if (fact == null) { result = false; context.bumpDecodeErrors(); - } else { result = true; - if (!host.beforeInsert(fact)) { // feature did not handle it so we handle it here host.afterInsert(fact, result); - host.sawMessage(); context.addEvent(); } } - host.afterOffer(commType, topic, event, result); return null; } @@ -670,10 +663,9 @@ public class FeatureTest { * <i>channel</i> embedded within the message. If it's the "admin" channel, then the * message is placed on all queues. */ - private static class TopicSinkImpl extends TopicImpl implements TopicSink { + private static class TopicSinkImpl extends TopicImpl implements TopicSink { private final Context context; - /** * Used to decode the messages so that the channel can be extracted. */ @@ -684,6 +676,7 @@ public class FeatureTest { * * @param context context */ + public TopicSinkImpl(Context context) { this.context = context; } @@ -693,22 +686,17 @@ public class FeatureTest { if (!isAlive()) { return false; } - try { Message msg = serializer.decodeMsg(message); String channel = msg.getChannel(); - if (Message.ADMIN.equals(channel)) { // add to every queue context.offerInternal(message); - } else { // add to a specific queue context.offerInternal(channel, message); } - return true; - } catch (JsonParseException e) { logger.warn("could not decode message: {}", message); context.bumpDecodeErrors(); @@ -720,15 +708,14 @@ public class FeatureTest { /** * Source implementation that reads from a queue associated with a topic. */ + private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource { private final String topic; - /** * Queue from which to retrieve messages. */ private final BlockingQueue<String> queue; - /** * Manages the current consumer thread. The "first" item is used as a trigger to * tell the thread to stop processing, while the "second" item is triggered <i>by @@ -743,11 +730,11 @@ public class FeatureTest { * @param internal {@code true} if to read from the internal topic, {@code false} * to read from the external topic */ + public TopicSourceImpl(Context context, boolean internal) { if (internal) { this.topic = INTERNAL_TOPIC; this.queue = context.getCurrentHost().getInternalQueue(); - } else { this.topic = EXTERNAL_TOPIC; this.queue = context.getExternalTopic(); @@ -773,33 +760,25 @@ public class FeatureTest { * Starts a thread that takes messages from the queue and gives them to the * listener. Stops the thread of any previously registered listener. */ + @Override public void register(TopicListener listener) { - Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1)); - + Pair<CountDownLatch, CountDownLatch> newPair = Pair.of(new CountDownLatch(1), new CountDownLatch(1)); reregister(newPair); - Thread thread = new Thread(() -> { - try { do { - processMessages(newPair.first(), listener); - } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS)); - + processMessages(newPair.getLeft(), listener); + } while (!newPair.getLeft().await(stdInterPollMs, TimeUnit.MILLISECONDS)); logger.info("topic source thread completed"); - } catch (InterruptedException e) { logger.warn("topic source thread aborted", e); Thread.currentThread().interrupt(); - } catch (RuntimeException e) { logger.warn("topic source thread aborted", e); } - - newPair.second().countDown(); - + newPair.getRight().countDown(); }); - thread.setDaemon(true); thread.start(); } @@ -807,6 +786,7 @@ public class FeatureTest { /** * Stops the thread of <i>any</i> currently registered listener. */ + @Override public void unregister(TopicListener listener) { reregister(null); @@ -818,6 +798,7 @@ public class FeatureTest { * * @param newPair the new "pair", or {@code null} to unregister */ + private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) { try { Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair); @@ -826,26 +807,20 @@ public class FeatureTest { // unregister was invoked twice in a row logger.warn("re-unregister for topic source"); } - // no previous thread to stop return; } - // need to stop the previous thread - // tell it to stop - oldPair.first().countDown(); - + oldPair.getLeft().countDown(); // wait for it to stop - if (!oldPair.second().await(2, TimeUnit.SECONDS)) { + if (!oldPair.getRight().await(2, TimeUnit.SECONDS)) { logger.warn("old topic registration is still running"); } - } catch (InterruptedException e) { logger.warn("old topic registration may still be running", e); Thread.currentThread().interrupt(); } - if (newPair != null) { // register was invoked twice in a row logger.warn("re-register for topic source"); @@ -859,15 +834,13 @@ public class FeatureTest { * @param listener listener * @throws InterruptedException throws interrupted exception */ - private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException { + private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException { for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) { - String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS); if (msg == null) { return; } - listener.onTopicEvent(CommInfrastructure.UEB, topic, msg); } } @@ -877,11 +850,13 @@ public class FeatureTest { * Topic implementation. Most methods just throw * {@link UnsupportedOperationException}. */ + private static class TopicImpl implements Topic { /** * Constructor. */ + public TopicImpl() { super(); } @@ -960,8 +935,8 @@ public class FeatureTest { /** * Feature with overrides. */ - private static class PoolingFeatureImpl extends PoolingFeature { + private static class PoolingFeatureImpl extends PoolingFeature { private final Context context; /** @@ -969,9 +944,9 @@ public class FeatureTest { * * @param context context */ + public PoolingFeatureImpl(Context context) { this.context = context; - /* * Note: do NOT extract anything from "context" at this point, because it * hasn't been fully initialized yet @@ -981,9 +956,7 @@ public class FeatureTest { @Override public Properties getProperties(String featName) { Properties props = new Properties(); - props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}"); - props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true"); props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC); props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000"); @@ -998,7 +971,6 @@ public class FeatureTest { "" + stdActiveHeartbeatMs); props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), "" + stdInterHeartbeatMs); - return props; } @@ -1014,6 +986,7 @@ public class FeatureTest { * @param spec specializer to be embedded * @return the property name, with the specializer embedded within it */ + private String specialize(String propnm, String spec) { String suffix = propnm.substring(PREFIX.length()); return PREFIX + spec + "." + suffix; @@ -1022,9 +995,7 @@ public class FeatureTest { @Override protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props, CountDownLatch activeLatch) { - currentContext.set(context); - return new PoolingManagerTest(host, controller, props, activeLatch); } } @@ -1032,6 +1003,7 @@ public class FeatureTest { /** * Pooling Manager with overrides. */ + private static class PoolingManagerTest extends PoolingManagerImpl { /** @@ -1042,9 +1014,9 @@ public class FeatureTest { * @param props the properties * @param activeLatch the latch */ + public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props, CountDownLatch activeLatch) { - super(host, controller, props, activeLatch); } @@ -1067,6 +1039,7 @@ public class FeatureTest { /** * DMaaP Manager with overrides. */ + private static class DmaapManagerImpl extends DmaapManager { /** @@ -1076,6 +1049,7 @@ public class FeatureTest { * @param topic the topic * @throws PoolingFeatureException if an error occurs */ + public DmaapManagerImpl(String topic) throws PoolingFeatureException { super(topic); } @@ -1094,16 +1068,16 @@ public class FeatureTest { /** * Controller that also implements the {@link TopicListener} interface. */ - private static interface ListenerController extends PolicyController, TopicListener { + private static interface ListenerController extends PolicyController, TopicListener { } /** * Simple function that takes no arguments and returns nothing. */ + @FunctionalInterface private static interface VoidFunction { - void apply(); } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java index f9878a9d..63bfc11e 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java @@ -3,13 +3,14 @@ * ONAP * ================================================================================ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -38,6 +39,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; +import org.apache.commons.lang3.tuple.Pair; import org.junit.Before; import org.junit.Test; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; @@ -46,7 +48,6 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyEngine; -import org.onap.policy.drools.utils.Pair; public class PoolingFeatureTest { @@ -83,7 +84,7 @@ public class PoolingFeatureTest { /** * Setup. - * + * * @throws Exception exception */ @Before @@ -113,8 +114,8 @@ public class PoolingFeatureTest { pool.afterCreate(controller1); pool.afterCreate(controller2); - mgr1 = managers.get(0).first(); - mgr2 = managers.get(1).first(); + mgr1 = managers.get(0).getLeft(); + mgr2 = managers.get(1).getLeft(); } @Test @@ -245,7 +246,7 @@ public class PoolingFeatureTest { verify(mgr1).afterStop(); assertFalse(pool.afterStop(controllerDisabled)); - + // count should be unchanged verify(mgr1).afterStop(); } @@ -254,7 +255,7 @@ public class PoolingFeatureTest { public void testAfterHalt() { assertFalse(pool.afterHalt(controller1)); assertFalse(pool.afterHalt(controller1)); - + verify(mgr1, never()).afterStop(); assertFalse(pool.afterStop(controllerDisabled)); @@ -264,7 +265,7 @@ public class PoolingFeatureTest { public void testAfterShutdown() { assertFalse(pool.afterShutdown(controller1)); assertFalse(pool.afterShutdown(controller1)); - + verify(mgr1, never()).afterStop(); assertFalse(pool.afterStop(controllerDisabled)); @@ -515,7 +516,7 @@ public class PoolingFeatureTest { PoolingManagerImpl mgr = mock(PoolingManagerImpl.class); - managers.add(new Pair<>(mgr, props)); + managers.add(Pair.of(mgr, props)); return mgr; } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java index c8cbdbb5..e24c3c1e 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java @@ -3,13 +3,14 @@ * ONAP * ================================================================================ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -36,6 +37,7 @@ import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.junit.Before; import org.junit.Test; import org.onap.policy.drools.pooling.message.BucketAssignments; @@ -44,7 +46,6 @@ import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; -import org.onap.policy.drools.utils.Pair; import org.onap.policy.drools.utils.Triple; public class ActiveStateTest extends SupportBasicStateTester { @@ -54,6 +55,7 @@ public class ActiveStateTest extends SupportBasicStateTester { /** * Setup. */ + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -70,7 +72,7 @@ public class ActiveStateTest extends SupportBasicStateTester { // ensure a heart beat was generated Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.second().getSource()); + assertEquals(MY_HOST, msg.getRight().getSource()); } @Test @@ -187,7 +189,7 @@ public class ActiveStateTest extends SupportBasicStateTester { state = new ActiveState(mgr); /* - * + * * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader thus * should be ignored. */ @@ -360,8 +362,8 @@ public class ActiveStateTest extends SupportBasicStateTester { verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class)); Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.first()); - assertEquals(MY_HOST, msg.second().getSource()); + assertEquals(MY_HOST, msg.getLeft()); + assertEquals(MY_HOST, msg.getRight().getSource()); } @Test @@ -454,8 +456,8 @@ public class ActiveStateTest extends SupportBasicStateTester { verify(mgr, times(1)).publish(any(), any()); Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.first()); - assertEquals(MY_HOST, msg.second().getSource()); + assertEquals(MY_HOST, msg.getLeft()); + assertEquals(MY_HOST, msg.getRight().getSource()); } @Test @@ -469,13 +471,13 @@ public class ActiveStateTest extends SupportBasicStateTester { // this message should go to itself msg = capturePublishedMessage(Heartbeat.class, index++); - assertEquals(MY_HOST, msg.first()); - assertEquals(MY_HOST, msg.second().getSource()); + assertEquals(MY_HOST, msg.getLeft()); + assertEquals(MY_HOST, msg.getRight().getSource()); // this message should go to its successor msg = capturePublishedMessage(Heartbeat.class, index++); - assertEquals(HOST1, msg.first()); - assertEquals(MY_HOST, msg.second().getSource()); + assertEquals(HOST1, msg.getLeft()); + assertEquals(MY_HOST, msg.getRight().getSource()); } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java index 77491616..ab468a1c 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java @@ -3,13 +3,14 @@ * ONAP * ================================================================================ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -29,6 +30,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.junit.Before; import org.junit.Test; import org.onap.policy.drools.pooling.message.BucketAssignments; @@ -36,7 +38,6 @@ import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Query; -import org.onap.policy.drools.utils.Pair; public class InactiveStateTest extends SupportBasicStateTester { @@ -44,8 +45,9 @@ public class InactiveStateTest extends SupportBasicStateTester { /** * Setup. - * + * */ + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -111,13 +113,13 @@ public class InactiveStateTest extends SupportBasicStateTester { Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_REACTIVATE_WAIT_MS, timer.first().longValue()); + assertEquals(STD_REACTIVATE_WAIT_MS, timer.getLeft().longValue()); // invoke the task - it should go to the state returned by the mgr State next = mock(State.class); when(mgr.goStart()).thenReturn(next); - assertEquals(next, timer.second().fire()); + assertEquals(next, timer.getRight().fire()); } @Test diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java index 97c9c95a..aa999b5d 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java @@ -3,13 +3,14 @@ * ONAP * ================================================================================ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,6 +33,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.junit.Before; import org.junit.Test; import org.onap.policy.drools.pooling.message.BucketAssignments; @@ -39,7 +41,6 @@ import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.utils.Pair; public class QueryStateTest extends SupportBasicStateTester { @@ -48,6 +49,7 @@ public class QueryStateTest extends SupportBasicStateTester { /** * Setup. */ + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -77,8 +79,8 @@ public class QueryStateTest extends SupportBasicStateTester { Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); - assertNotNull(timer.second()); + assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue()); + assertNotNull(timer.getRight()); } @Test @@ -251,15 +253,15 @@ public class QueryStateTest extends SupportBasicStateTester { Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); - assertNotNull(timer.second()); + assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue()); + assertNotNull(timer.getRight()); // should published an Offline message and go inactive State next = mock(State.class); when(mgr.goStart()).thenReturn(next); - assertEquals(next, timer.second().fire()); + assertEquals(next, timer.getRight().fire()); // should continue distributing verify(mgr, never()).startDistributing(null); @@ -275,13 +277,13 @@ public class QueryStateTest extends SupportBasicStateTester { Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); - assertNotNull(timer.second()); + assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue()); + assertNotNull(timer.getRight()); State next = mock(State.class); when(mgr.goActive()).thenReturn(next); - assertEquals(next, timer.second().fire()); + assertEquals(next, timer.getRight().fire()); // should have published a Leader message Leader msg = captureAdminMessage(Leader.class); @@ -304,14 +306,14 @@ public class QueryStateTest extends SupportBasicStateTester { Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); - assertNotNull(timer.second()); + assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue()); + assertNotNull(timer.getRight()); // set up active state, as that's what it should return State next = mock(State.class); when(mgr.goActive()).thenReturn(next); - assertEquals(next, timer.second().fire()); + assertEquals(next, timer.getRight().fire()); // should NOT have published a Leader message assertTrue(admin.isEmpty()); @@ -335,14 +337,14 @@ public class QueryStateTest extends SupportBasicStateTester { Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); - assertNotNull(timer.second()); + assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue()); + assertNotNull(timer.getRight()); // set up inactive state, as that's what it should return State next = mock(State.class); when(mgr.goInactive()).thenReturn(next); - assertEquals(next, timer.second().fire()); + assertEquals(next, timer.getRight().fire()); // should NOT have published a Leader message assertTrue(admin.isEmpty()); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java index 092657e5..1fd49c50 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java @@ -3,13 +3,14 @@ * ONAP * ================================================================================ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -31,6 +32,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.junit.Before; import org.junit.Test; import org.onap.policy.drools.pooling.message.Forward; @@ -40,7 +42,6 @@ import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; -import org.onap.policy.drools.utils.Pair; import org.onap.policy.drools.utils.Triple; public class StartStateTest extends SupportBasicStateTester { @@ -50,6 +51,7 @@ public class StartStateTest extends SupportBasicStateTester { /** * Setup. */ + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -81,8 +83,8 @@ public class StartStateTest extends SupportBasicStateTester { Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.first()); - assertEquals(state.getHbTimestampMs(), msg.second().getTimestampMs()); + assertEquals(MY_HOST, msg.getLeft()); + assertEquals(state.getHbTimestampMs(), msg.getRight().getTimestampMs()); /* @@ -95,11 +97,11 @@ public class StartStateTest extends SupportBasicStateTester { // invoke the task - it should generate another heartbeat assertEquals(null, generator.third().fire()); - verify(mgr, times(2)).publish(MY_HOST, msg.second()); + verify(mgr, times(2)).publish(MY_HOST, msg.getRight()); // and again assertEquals(null, generator.third().fire()); - verify(mgr, times(3)).publish(MY_HOST, msg.second()); + verify(mgr, times(3)).publish(MY_HOST, msg.getRight()); /* @@ -107,13 +109,13 @@ public class StartStateTest extends SupportBasicStateTester { */ Pair<Long, StateTimerTask> checker = onceTasks.removeFirst(); - assertEquals(STD_HEARTBEAT_WAIT_MS, checker.first().longValue()); + assertEquals(STD_HEARTBEAT_WAIT_MS, checker.getLeft().longValue()); // invoke the task - it should go to the state returned by the mgr State next = mock(State.class); when(mgr.goInactive()).thenReturn(next); - assertEquals(next, checker.second().fire()); + assertEquals(next, checker.getRight().fire()); verify(mgr).startDistributing(null); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java index 4727652a..a1246938 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java @@ -3,13 +3,14 @@ * ONAP * ================================================================================ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -35,13 +36,13 @@ import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.tuple.Pair; import org.onap.policy.drools.pooling.CancellableScheduledTask; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.PoolingProperties; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; -import org.onap.policy.drools.utils.Pair; import org.onap.policy.drools.utils.Triple; /** @@ -116,7 +117,7 @@ public class SupportBasicStateTester { /** * Setup. - * + * * @throws Exception throws exception */ public void setUp() throws Exception { @@ -152,7 +153,7 @@ public class SupportBasicStateTester { // capture publish() arguments doAnswer(invocation -> { Object[] args = invocation.getArguments(); - published.add(new Pair<>((String) args[0], (Message) args[1])); + published.add(Pair.of((String) args[0], (Message) args[1])); return null; }).when(mgr).publish(anyString(), any(Message.class)); @@ -168,7 +169,7 @@ public class SupportBasicStateTester { // capture schedule() arguments, and return a new future when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> { Object[] args = invocation.getArguments(); - onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1])); + onceTasks.add(Pair.of((Long) args[0], (StateTimerTask) args[1])); CancellableScheduledTask sched = mock(CancellableScheduledTask.class); onceSchedules.add(sched); @@ -198,7 +199,7 @@ public class SupportBasicStateTester { /** * Makes a sorted set of hosts. - * + * * @param hosts the hosts to be sorted * @return the set of hosts, sorted */ @@ -208,7 +209,7 @@ public class SupportBasicStateTester { /** * Captures the host array from the Leader message published to the admin channel. - * + * * @return the host array, as a list */ protected List<String> captureHostList() { @@ -217,7 +218,7 @@ public class SupportBasicStateTester { /** * Captures the host array from the Leader message published to the admin channel. - * + * * @return the host array */ protected String[] captureHostArray() { @@ -231,7 +232,7 @@ public class SupportBasicStateTester { /** * Captures the assignments from the Leader message published to the admin channel. - * + * * @return the bucket assignments */ protected BucketAssignments captureAssignments() { @@ -244,7 +245,7 @@ public class SupportBasicStateTester { /** * Captures the message published to the admin channel. - * + * * @param clazz type of {@link Message} to capture * @return the message that was published */ @@ -254,7 +255,7 @@ public class SupportBasicStateTester { /** * Captures the message published to the admin channel. - * + * * @param clazz type of {@link Message} to capture * @param index index of the item to be captured * @return the message that was published @@ -265,7 +266,7 @@ public class SupportBasicStateTester { /** * Captures the message published to the non-admin channels. - * + * * @param clazz type of {@link Message} to capture * @return the (channel,message) pair that was published */ @@ -275,13 +276,13 @@ public class SupportBasicStateTester { /** * Captures the message published to the non-admin channels. - * + * * @param clazz type of {@link Message} to capture * @param index index of the item to be captured * @return the (channel,message) pair that was published */ protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) { Pair<String, Message> msg = published.get(index); - return new Pair<>(msg.first(), clazz.cast(msg.second())); + return Pair.of(msg.getLeft(), clazz.cast(msg.getRight())); } } |