diff options
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java')
-rw-r--r-- | feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java | 164 |
1 files changed, 69 insertions, 95 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java index c35e525a..96b358da 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.tuple.Pair; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -61,7 +63,6 @@ import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyEngine; -import org.onap.policy.drools.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,25 +80,21 @@ import org.slf4j.LoggerFactory; * * <p>Invoke {@link #runSlow()}, before the test, to slow things down. */ -public class FeatureTest { +public class FeatureTest { private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class); - /** * Name of the topic used for inter-host communication. */ private static final String INTERNAL_TOPIC = "my.internal.topic"; - /** * Name of the topic from which "external" events "arrive". */ private static final String EXTERNAL_TOPIC = "my.external.topic"; - /** * Name of the controller. */ private static final String CONTROLLER1 = "controller.one"; - private static long stdReactivateWaitMs = 200; private static long stdIdentificationMs = 60; private static long stdStartHeartbeatMs = 60; @@ -111,28 +108,26 @@ public class FeatureTest { * Used to decode events from the external topic. */ private static final Gson mapper = new Gson(); - /** * Used to identify the current context. */ private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>(); - /** * Context for the current test case. */ private Context ctx; - /** * Setup. */ + @Before public void setUp() { ctx = null; } - /** * Tear down. */ + @After public void tearDown() { if (ctx != null) { @@ -157,19 +152,14 @@ public class FeatureTest { private void run(int nmessages, int nhosts) throws Exception { ctx = new Context(nmessages); - for (int x = 0; x < nhosts; ++x) { ctx.addHost(); } - ctx.startHosts(); - for (int x = 0; x < nmessages; ++x) { ctx.offerExternal(makeMessage(x)); } - ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS); - assertEquals(0, ctx.getDecodeErrors()); assertEquals(0, ctx.getRemainingEvents()); ctx.checkAllSawAMsg(); @@ -178,10 +168,10 @@ public class FeatureTest { private String makeMessage(int reqnum) { return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}"; } - /** * Invoke this to slow the timers down. */ + protected static void runSlow() { stdReactivateWaitMs = 10000; stdIdentificationMs = 10000; @@ -193,58 +183,50 @@ public class FeatureTest { stdInterPollMs = 2000; stdEventWaitSec = 1000; } - /** * Decodes an event. * * @param event event * @return the decoded event, or {@code null} if it cannot be decoded */ + private static Object decodeEvent(String event) { try { return mapper.fromJson(event, TreeMap.class); - } catch (JsonParseException e) { logger.warn("cannot decode external event", e); return null; } } - /** * Context used for a single test case. */ - private static class Context { + private static class Context { /** * Hosts that have been added to this context. */ private final Deque<Host> hosts = new LinkedList<>(); - /** * Maps a drools controller to its policy controller. */ private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>(); - /** * Maps a channel to its queue. Does <i>not</i> include the "admin" channel. */ private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7); - /** * Queue for the external "DMaaP" topic. */ private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>(); - /** * Counts the number of decode errors. */ private final AtomicInteger numDecodeErrors = new AtomicInteger(0); - /** * Number of events we're still waiting to receive. */ private final CountDownLatch eventCounter; - /** * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by * {@link #getCurrentHost()}. @@ -256,13 +238,14 @@ public class FeatureTest { * * @param nEvents number of events to be processed */ + public Context(int events) { eventCounter = new CountDownLatch(events); } - /** * Destroys the context, stopping any hosts that remain. */ + public void destroy() { stopHosts(); hosts.clear(); @@ -273,16 +256,17 @@ public class FeatureTest { * * @return the new Host */ + public Host addHost() { Host host = new Host(this); hosts.add(host); - return host; } /** * Starts the hosts. */ + public void startHosts() { hosts.forEach(host -> host.start()); } @@ -290,6 +274,7 @@ public class FeatureTest { /** * Stops the hosts. */ + public void stopHosts() { hosts.forEach(host -> host.stop()); } @@ -297,6 +282,7 @@ public class FeatureTest { /** * Verifies that all hosts processed at least one message. */ + public void checkAllSawAMsg() { int msgs = 0; for (Host host : hosts) { @@ -312,6 +298,7 @@ public class FeatureTest { * @param host host * @param func function to invoke */ + public void withHost(Host host, VoidFunction func) { currentHost = host; func.apply(); @@ -323,6 +310,7 @@ public class FeatureTest { * * @param event event */ + public void offerExternal(String event) { externalTopic.offer(event); } @@ -333,6 +321,7 @@ public class FeatureTest { * @param channel channel * @param queue the channel's queue */ + public void addInternal(String channel, BlockingQueue<String> queue) { channel2queue.put(channel, queue); } @@ -342,6 +331,7 @@ public class FeatureTest { * * @param message message */ + public void offerInternal(String message) { channel2queue.values().forEach(queue -> queue.offer(message)); } @@ -352,6 +342,7 @@ public class FeatureTest { * @param channel channel * @param message message */ + public void offerInternal(String channel, String message) { BlockingQueue<String> queue = channel2queue.get(channel); if (queue != null) { @@ -365,6 +356,7 @@ public class FeatureTest { * @param controller controller * @param droolsController drools controller */ + public void addController(PolicyController controller, DroolsController droolsController) { drools2policy.put(droolsController, controller); } @@ -376,6 +368,7 @@ public class FeatureTest { * @return the controller associated with a drools controller, or {@code null} if * it has no associated controller */ + public PolicyController getController(DroolsController droolsController) { return drools2policy.get(droolsController); } @@ -385,6 +378,7 @@ public class FeatureTest { * * @return queue for the external topic */ + public BlockingQueue<String> getExternalTopic() { return externalTopic; } @@ -394,6 +388,7 @@ public class FeatureTest { * * @return the number of decode errors so far */ + public int getDecodeErrors() { return numDecodeErrors.get(); } @@ -401,6 +396,7 @@ public class FeatureTest { /** * Increments the count of decode errors. */ + public void bumpDecodeErrors() { numDecodeErrors.incrementAndGet(); } @@ -410,6 +406,7 @@ public class FeatureTest { * * @return the number of events that haven't been processed */ + public long getRemainingEvents() { return eventCounter.getCount(); } @@ -417,6 +414,7 @@ public class FeatureTest { /** * Adds an event to the counter. */ + public void addEvent() { eventCounter.countDown(); } @@ -429,6 +427,7 @@ public class FeatureTest { * @return {@code true} if all events have been processed, {@code false} otherwise * @throws InterruptedException throws interrupted */ + public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException { return eventCounter.await(time, units); } @@ -439,6 +438,7 @@ public class FeatureTest { * * @return the current host, or {@code null} if there is no current host */ + public Host getCurrentHost() { return currentHost; } @@ -447,25 +447,27 @@ public class FeatureTest { /** * Simulates a single "host". */ - private static class Host { + private static class Host { private final Context context; - private final PoolingFeature feature; /** * {@code True} if this host has processed a message, {@code false} otherwise. */ + private final AtomicBoolean sawMsg = new AtomicBoolean(false); /** * This host's internal "DMaaP" topic. */ + private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(); /** * Source that reads from the external topic and posts to the listener. */ + private TopicSource externalSource; // mock objects @@ -478,25 +480,20 @@ public class FeatureTest { * * @param context context */ + public Host(Context context) { this.context = context; - when(controller.getName()).thenReturn(CONTROLLER1); when(controller.getDrools()).thenReturn(drools); - // stop consuming events if the controller stops when(controller.stop()).thenAnswer(args -> { externalSource.unregister(controller); return true; }); - doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any()); - context.addController(controller, drools); - // arrange to read from the external topic externalSource = new TopicSourceImpl(context, false); - feature = new PoolingFeatureImpl(context); } @@ -505,6 +502,7 @@ public class FeatureTest { * * @return the host name */ + public String getName() { return feature.getHost(); } @@ -513,21 +511,16 @@ public class FeatureTest { * Starts threads for the host so that it begins consuming from both the external * "DMaaP" topic and its own internal "DMaaP" topic. */ - public void start() { + public void start() { context.withHost(this, () -> { - feature.beforeStart(engine); feature.afterCreate(controller); - // assign the queue for this host's internal topic context.addInternal(getName(), msgQueue); - feature.beforeStart(controller); - // start consuming events from the external topic externalSource.register(controller); - feature.afterStart(controller); }); } @@ -535,6 +528,7 @@ public class FeatureTest { /** * Stops the host's threads. */ + public void stop() { feature.beforeStop(controller); externalSource.unregister(controller); @@ -549,6 +543,7 @@ public class FeatureTest { * @param event event * @return {@code true} if the event was handled, {@code false} otherwise */ + public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { return feature.beforeOffer(controller, protocol, topic2, event); } @@ -562,8 +557,8 @@ public class FeatureTest { * @param success success * @return {@code true} if the event was handled, {@code false} otherwise */ - public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) { + public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) { return feature.afterOffer(controller, protocol, topic, event, success); } @@ -573,6 +568,7 @@ public class FeatureTest { * @param fact fact * @return {@code true} if the event was handled, {@code false} otherwise */ + public boolean beforeInsert(Object fact) { return feature.beforeInsert(drools, fact); } @@ -585,6 +581,7 @@ public class FeatureTest { * controller, {@code false} otherwise * @return {@code true} if the event was handled, {@code false} otherwise */ + public boolean afterInsert(Object fact, boolean successInsert) { return feature.afterInsert(drools, fact, successInsert); } @@ -592,6 +589,7 @@ public class FeatureTest { /** * Indicates that a message was seen for this host. */ + public void sawMessage() { sawMsg.set(true); } @@ -602,6 +600,7 @@ public class FeatureTest { * @return {@code true} if a message was seen for this host, {@code false} * otherwise */ + public boolean messageSeen() { return sawMsg.get(); } @@ -611,6 +610,7 @@ public class FeatureTest { * * @return the queue associated with this host's internal topic */ + public BlockingQueue<String> getInternalQueue() { return msgQueue; } @@ -620,8 +620,8 @@ public class FeatureTest { * Listener for the external topic. Simulates the actions taken by * <i>AggregatedPolicyController.onTopicEvent</i>. */ - private static class MyExternalTopicListener implements Answer<Void> { + private static class MyExternalTopicListener implements Answer<Void> { private final Context context; private final Host host; @@ -636,30 +636,23 @@ public class FeatureTest { CommInfrastructure commType = args.getArgument(index++); String topic = args.getArgument(index++); String event = args.getArgument(index++); - if (host.beforeOffer(commType, topic, event)) { return null; } - boolean result; Object fact = decodeEvent(event); - if (fact == null) { result = false; context.bumpDecodeErrors(); - } else { result = true; - if (!host.beforeInsert(fact)) { // feature did not handle it so we handle it here host.afterInsert(fact, result); - host.sawMessage(); context.addEvent(); } } - host.afterOffer(commType, topic, event, result); return null; } @@ -670,10 +663,9 @@ public class FeatureTest { * <i>channel</i> embedded within the message. If it's the "admin" channel, then the * message is placed on all queues. */ - private static class TopicSinkImpl extends TopicImpl implements TopicSink { + private static class TopicSinkImpl extends TopicImpl implements TopicSink { private final Context context; - /** * Used to decode the messages so that the channel can be extracted. */ @@ -684,6 +676,7 @@ public class FeatureTest { * * @param context context */ + public TopicSinkImpl(Context context) { this.context = context; } @@ -693,22 +686,17 @@ public class FeatureTest { if (!isAlive()) { return false; } - try { Message msg = serializer.decodeMsg(message); String channel = msg.getChannel(); - if (Message.ADMIN.equals(channel)) { // add to every queue context.offerInternal(message); - } else { // add to a specific queue context.offerInternal(channel, message); } - return true; - } catch (JsonParseException e) { logger.warn("could not decode message: {}", message); context.bumpDecodeErrors(); @@ -720,15 +708,14 @@ public class FeatureTest { /** * Source implementation that reads from a queue associated with a topic. */ + private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource { private final String topic; - /** * Queue from which to retrieve messages. */ private final BlockingQueue<String> queue; - /** * Manages the current consumer thread. The "first" item is used as a trigger to * tell the thread to stop processing, while the "second" item is triggered <i>by @@ -743,11 +730,11 @@ public class FeatureTest { * @param internal {@code true} if to read from the internal topic, {@code false} * to read from the external topic */ + public TopicSourceImpl(Context context, boolean internal) { if (internal) { this.topic = INTERNAL_TOPIC; this.queue = context.getCurrentHost().getInternalQueue(); - } else { this.topic = EXTERNAL_TOPIC; this.queue = context.getExternalTopic(); @@ -773,33 +760,25 @@ public class FeatureTest { * Starts a thread that takes messages from the queue and gives them to the * listener. Stops the thread of any previously registered listener. */ + @Override public void register(TopicListener listener) { - Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1)); - + Pair<CountDownLatch, CountDownLatch> newPair = Pair.of(new CountDownLatch(1), new CountDownLatch(1)); reregister(newPair); - Thread thread = new Thread(() -> { - try { do { - processMessages(newPair.first(), listener); - } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS)); - + processMessages(newPair.getLeft(), listener); + } while (!newPair.getLeft().await(stdInterPollMs, TimeUnit.MILLISECONDS)); logger.info("topic source thread completed"); - } catch (InterruptedException e) { logger.warn("topic source thread aborted", e); Thread.currentThread().interrupt(); - } catch (RuntimeException e) { logger.warn("topic source thread aborted", e); } - - newPair.second().countDown(); - + newPair.getRight().countDown(); }); - thread.setDaemon(true); thread.start(); } @@ -807,6 +786,7 @@ public class FeatureTest { /** * Stops the thread of <i>any</i> currently registered listener. */ + @Override public void unregister(TopicListener listener) { reregister(null); @@ -818,6 +798,7 @@ public class FeatureTest { * * @param newPair the new "pair", or {@code null} to unregister */ + private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) { try { Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair); @@ -826,26 +807,20 @@ public class FeatureTest { // unregister was invoked twice in a row logger.warn("re-unregister for topic source"); } - // no previous thread to stop return; } - // need to stop the previous thread - // tell it to stop - oldPair.first().countDown(); - + oldPair.getLeft().countDown(); // wait for it to stop - if (!oldPair.second().await(2, TimeUnit.SECONDS)) { + if (!oldPair.getRight().await(2, TimeUnit.SECONDS)) { logger.warn("old topic registration is still running"); } - } catch (InterruptedException e) { logger.warn("old topic registration may still be running", e); Thread.currentThread().interrupt(); } - if (newPair != null) { // register was invoked twice in a row logger.warn("re-register for topic source"); @@ -859,15 +834,13 @@ public class FeatureTest { * @param listener listener * @throws InterruptedException throws interrupted exception */ - private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException { + private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException { for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) { - String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS); if (msg == null) { return; } - listener.onTopicEvent(CommInfrastructure.UEB, topic, msg); } } @@ -877,11 +850,13 @@ public class FeatureTest { * Topic implementation. Most methods just throw * {@link UnsupportedOperationException}. */ + private static class TopicImpl implements Topic { /** * Constructor. */ + public TopicImpl() { super(); } @@ -960,8 +935,8 @@ public class FeatureTest { /** * Feature with overrides. */ - private static class PoolingFeatureImpl extends PoolingFeature { + private static class PoolingFeatureImpl extends PoolingFeature { private final Context context; /** @@ -969,9 +944,9 @@ public class FeatureTest { * * @param context context */ + public PoolingFeatureImpl(Context context) { this.context = context; - /* * Note: do NOT extract anything from "context" at this point, because it * hasn't been fully initialized yet @@ -981,9 +956,7 @@ public class FeatureTest { @Override public Properties getProperties(String featName) { Properties props = new Properties(); - props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}"); - props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true"); props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC); props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000"); @@ -998,7 +971,6 @@ public class FeatureTest { "" + stdActiveHeartbeatMs); props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), "" + stdInterHeartbeatMs); - return props; } @@ -1014,6 +986,7 @@ public class FeatureTest { * @param spec specializer to be embedded * @return the property name, with the specializer embedded within it */ + private String specialize(String propnm, String spec) { String suffix = propnm.substring(PREFIX.length()); return PREFIX + spec + "." + suffix; @@ -1022,9 +995,7 @@ public class FeatureTest { @Override protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props, CountDownLatch activeLatch) { - currentContext.set(context); - return new PoolingManagerTest(host, controller, props, activeLatch); } } @@ -1032,6 +1003,7 @@ public class FeatureTest { /** * Pooling Manager with overrides. */ + private static class PoolingManagerTest extends PoolingManagerImpl { /** @@ -1042,9 +1014,9 @@ public class FeatureTest { * @param props the properties * @param activeLatch the latch */ + public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props, CountDownLatch activeLatch) { - super(host, controller, props, activeLatch); } @@ -1067,6 +1039,7 @@ public class FeatureTest { /** * DMaaP Manager with overrides. */ + private static class DmaapManagerImpl extends DmaapManager { /** @@ -1076,6 +1049,7 @@ public class FeatureTest { * @param topic the topic * @throws PoolingFeatureException if an error occurs */ + public DmaapManagerImpl(String topic) throws PoolingFeatureException { super(topic); } @@ -1094,16 +1068,16 @@ public class FeatureTest { /** * Controller that also implements the {@link TopicListener} interface. */ - private static interface ListenerController extends PolicyController, TopicListener { + private static interface ListenerController extends PolicyController, TopicListener { } /** * Simple function that takes no arguments and returns nothing. */ + @FunctionalInterface private static interface VoidFunction { - void apply(); } } |