summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java')
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java1158
1 files changed, 1158 insertions, 0 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
new file mode 100644
index 00000000..13d70f52
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -0,0 +1,1158 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.FilterableTopicSource;
+import org.onap.policy.drools.event.comm.Topic;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
+ * its own feature object.
+ */
+public class FeatureTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
+
+ /**
+ * Name of the topic used for inter-host communication.
+ */
+ private static final String INTERNAL_TOPIC = "my.internal.topic";
+
+ /**
+ * Name of the topic from which "external" events "arrive".
+ */
+ private static final String EXTERNAL_TOPIC = "my.external.topic";
+
+ /**
+ * Name of the controller.
+ */
+ private static final String CONTROLLER1 = "controller.one";
+
+ // private static final long STD_HEARTBEAT_WAIT_MS = 100;
+ // private static final long STD_REACTIVATE_WAIT_MS = 200;
+ // private static final long STD_IDENTIFICATION_MS = 60;
+ // private static final long STD_ACTIVE_HEARTBEAT_MS = 5;
+ // private static final long STD_INTER_HEARTBEAT_MS = 50;
+ // private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
+ // private static final long POLL_MS = 2;
+ // private static final long INTER_POLL_MS = 2;
+ // private static final long EVENT_WAIT_SEC = 5;
+
+ // use these to slow things down
+ private static final long STD_HEARTBEAT_WAIT_MS = 5000;
+ private static final long STD_REACTIVATE_WAIT_MS = 10000;
+ private static final long STD_IDENTIFICATION_MS = 10000;
+ private static final long STD_ACTIVE_HEARTBEAT_MS = 5000;
+ private static final long STD_INTER_HEARTBEAT_MS = 12000;
+ private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
+ private static final long POLL_MS = 2;
+ private static final long INTER_POLL_MS = 2000;
+ private static final long EVENT_WAIT_SEC = 1000;
+
+ // these are saved and restored on exit from this test class
+ private static PoolingFeature.Factory saveFeatureFactory;
+ private static PoolingManagerImpl.Factory saveManagerFactory;
+ private static DmaapManager.Factory saveDmaapFactory;
+
+ /**
+ * Context for the current test case.
+ */
+ private Context ctx;
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveFeatureFactory = PoolingFeature.getFactory();
+ saveManagerFactory = PoolingManagerImpl.getFactory();
+ saveDmaapFactory = DmaapManager.getFactory();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ PoolingFeature.setFactory(saveFeatureFactory);
+ PoolingManagerImpl.setFactory(saveManagerFactory);
+ DmaapManager.setFactory(saveDmaapFactory);
+ }
+
+ @Before
+ public void setUp() {
+ ctx = null;
+ }
+
+ @After
+ public void tearDown() {
+ if (ctx != null) {
+ ctx.destroy();
+ }
+ }
+
+ @Ignore
+ @Test
+ public void test_SingleHost() throws Exception {
+ int nmessages = 70;
+
+ ctx = new Context(nmessages);
+
+ ctx.addHost();
+ ctx.startHosts();
+
+ for (int x = 0; x < nmessages; ++x) {
+ ctx.offerExternal(makeMessage(x));
+ }
+
+ ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(0, ctx.getDecodeErrors());
+ assertEquals(0, ctx.getRemainingEvents());
+ ctx.checkAllSawAMsg();
+ }
+
+ @Ignore
+ @Test
+ public void test_TwoHosts() throws Exception {
+ int nmessages = 200;
+
+ ctx = new Context(nmessages);
+
+ ctx.addHost();
+ ctx.addHost();
+ ctx.startHosts();
+
+ for (int x = 0; x < nmessages; ++x) {
+ ctx.offerExternal(makeMessage(x));
+ }
+
+ // wait for all hosts to have time to process a few messages
+ Thread.sleep(STD_ACTIVE_HEARTBEAT_MS + INTER_POLL_MS * 3);
+
+ // pause a topic for a bit
+// ctx.pauseTopic();
+
+ // now we'll see if it recovers
+
+ ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(0, ctx.getDecodeErrors());
+ assertEquals(0, ctx.getRemainingEvents());
+ ctx.checkAllSawAMsg();
+ }
+
+ private String makeMessage(int reqnum) {
+ return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
+ }
+
+ /**
+ * Context used for a single test case.
+ */
+ private static class Context {
+
+ private final FeatureFactory featureFactory;
+ private final ManagerFactory managerFactory;
+ private final DmaapFactory dmaapFactory;
+
+ /**
+ * Hosts that have been added to this context.
+ */
+ private final Deque<Host> hosts = new LinkedList<>();
+
+ /**
+ * Maps a drools controller to its policy controller.
+ */
+ private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
+
+ /**
+ * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
+ */
+ private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
+
+ /**
+ * Queue for the external "DMaaP" topic.
+ */
+ private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
+
+ /**
+ * Counts the number of decode errors.
+ */
+ private final AtomicInteger nDecodeErrors = new AtomicInteger(0);
+
+ /**
+ * Number of events we're still waiting to receive.
+ */
+ private final CountDownLatch eventCounter;
+
+ /**
+ * Maps host name to its topic source. This must be in sorted order so we can
+ * identify the source for the host with the higher name.
+ */
+ private TreeMap<String, TopicSourceImpl> host2topic = new TreeMap<>();
+
+ /**
+ * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
+ * {@link #getCurrentHost()}.
+ */
+ private Host currentHost = null;
+
+ /**
+ *
+ * @param nEvents number of events to be processed
+ */
+ public Context(int nEvents) {
+ featureFactory = new FeatureFactory(this);
+ managerFactory = new ManagerFactory(this);
+ dmaapFactory = new DmaapFactory(this);
+ eventCounter = new CountDownLatch(nEvents);
+
+ PoolingFeature.setFactory(featureFactory);
+ PoolingManagerImpl.setFactory(managerFactory);
+ DmaapManager.setFactory(dmaapFactory);
+ }
+
+ /**
+ * Destroys the context, stopping any hosts that remain.
+ */
+ public void destroy() {
+ stopHosts();
+ hosts.clear();
+ }
+
+ /**
+ * Creates and adds a new host to the context.
+ */
+ public void addHost() {
+ hosts.add(new Host(this));
+ }
+
+ /**
+ * Starts the hosts.
+ */
+ public void startHosts() {
+ hosts.forEach(host -> host.start());
+ }
+
+ /**
+ * Stops the hosts.
+ */
+ public void stopHosts() {
+ hosts.forEach(host -> host.stop());
+ }
+
+ /**
+ * Verifies that all hosts processed at least one message.
+ */
+ public void checkAllSawAMsg() {
+ int x = 0;
+ for (Host host : hosts) {
+ assertTrue("x=" + x, host.messageSeen());
+ ++x;
+ }
+ }
+
+ /**
+ * Sets {@link #currentHost} to the specified host, and then invokes the given
+ * function. Resets {@link #currentHost} to {@code null} before returning.
+ *
+ * @param host
+ * @param func function to invoke
+ */
+ public void withHost(Host host, VoidFunction func) {
+ currentHost = host;
+ func.apply();
+ currentHost = null;
+ }
+
+ /**
+ * Offers an event to the external topic.
+ *
+ * @param event
+ */
+ public void offerExternal(String event) {
+ externalTopic.offer(event);
+ }
+
+ /**
+ * Adds an internal channel to the set of channels.
+ *
+ * @param channel
+ * @param queue the channel's queue
+ */
+ public void addInternal(String channel, BlockingQueue<String> queue) {
+ channel2queue.put(channel, queue);
+ }
+
+ /**
+ * Offers a message to all internal channels.
+ *
+ * @param message
+ */
+ public void offerInternal(String message) {
+ channel2queue.values().forEach(queue -> queue.offer(message));
+ }
+
+ /**
+ * Offers amessage to an internal channel.
+ *
+ * @param channel
+ * @param message
+ */
+ public void offerInternal(String channel, String message) {
+ BlockingQueue<String> queue = channel2queue.get(channel);
+ if (queue != null) {
+ queue.offer(message);
+ }
+ }
+
+ /**
+ * Decodes an event.
+ *
+ * @param event
+ * @return the decoded event, or {@code null} if it cannot be decoded
+ */
+ public Object decodeEvent(String event) {
+ return managerFactory.decodeEvent(null, null, event);
+ }
+
+ /**
+ * Associates a controller with its drools controller.
+ *
+ * @param controller
+ * @param droolsController
+ */
+ public void addController(PolicyController controller, DroolsController droolsController) {
+ drools2policy.put(droolsController, controller);
+ }
+
+ /**
+ * @param droolsController
+ * @return the controller associated with a drools controller, or {@code null} if
+ * it has no associated controller
+ */
+ public PolicyController getController(DroolsController droolsController) {
+ return drools2policy.get(droolsController);
+ }
+
+ /**
+ * @return queue for the external topic
+ */
+ public BlockingQueue<String> getExternalTopic() {
+ return externalTopic;
+ }
+
+ /**
+ *
+ * @return the number of decode errors so far
+ */
+ public int getDecodeErrors() {
+ return nDecodeErrors.get();
+ }
+
+ /**
+ * Increments the count of decode errors.
+ */
+ public void bumpDecodeErrors() {
+ nDecodeErrors.incrementAndGet();
+ }
+
+ /**
+ *
+ * @return the number of events that haven't been processed
+ */
+ public long getRemainingEvents() {
+ return eventCounter.getCount();
+ }
+
+ /**
+ * Adds an event to the counter.
+ */
+ public void addEvent() {
+ eventCounter.countDown();
+ }
+
+ /**
+ * Waits, for a period of time, for all events to be processed.
+ *
+ * @param time
+ * @param units
+ * @return {@code true} if all events have been processed, {@code false} otherwise
+ * @throws InterruptedException
+ */
+ public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
+ return eventCounter.await(time, units);
+ }
+
+ /**
+ * Associates a host with a topic.
+ *
+ * @param host
+ * @param topic
+ */
+ public void addTopicSource(String host, TopicSourceImpl topic) {
+ host2topic.put(host, topic);
+ }
+
+ /**
+ * Pauses the last topic source long enough to miss a heart beat.
+ */
+ public void pauseTopic() {
+ Entry<String, TopicSourceImpl> ent = host2topic.lastEntry();
+ if (ent != null) {
+ ent.getValue().pause(STD_ACTIVE_HEARTBEAT_MS);
+ }
+ }
+
+ /**
+ * Gets the current host, provided this is used from within a call to
+ * {@link #withHost(Host, VoidFunction)}.
+ *
+ * @return the current host, or {@code null} if there is no current host
+ */
+ public Host getCurrentHost() {
+ return currentHost;
+ }
+ }
+
+ /**
+ * Simulates a single "host".
+ */
+ private static class Host {
+
+ private final Context context;
+
+ private final PoolingFeature feature = new PoolingFeature();
+
+ /**
+ * {@code True} if this host has processed a message, {@code false} otherwise.
+ */
+ private final AtomicBoolean sawMsg = new AtomicBoolean(false);
+
+ /**
+ * This host's internal "DMaaP" topic.
+ */
+ private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
+
+ /**
+ * Source that reads from the external topic and posts to the listener.
+ */
+ private TopicSource externalSource;
+
+ // mock objects
+ private final PolicyEngine engine = mock(PolicyEngine.class);
+ private final ListenerController controller = mock(ListenerController.class);
+ private final DroolsController drools = mock(DroolsController.class);
+
+ /**
+ *
+ * @param context
+ */
+ public Host(Context context) {
+ this.context = context;
+
+ when(controller.getName()).thenReturn(CONTROLLER1);
+ when(controller.getDrools()).thenReturn(drools);
+
+ // stop consuming events if the controller stops
+ when(controller.stop()).thenAnswer(args -> {
+ externalSource.unregister(controller);
+ return true;
+ });
+
+ doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
+
+ context.addController(controller, drools);
+
+ // arrange to read from the external topic
+ externalSource = new TopicSourceImpl(context, false);
+ }
+
+ /**
+ * Gets the host name. This should only be invoked within {@link #start()}.
+ *
+ * @return the host name
+ */
+ public String getName() {
+ return PoolingManagerImpl.getLastHost();
+ }
+
+ /**
+ * Starts threads for the host so that it begins consuming from both the external
+ * "DMaaP" topic and its own internal "DMaaP" topic.
+ */
+ public void start() {
+
+ context.withHost(this, () -> {
+
+ feature.beforeStart(engine);
+ feature.afterCreate(controller);
+
+ // assign the queue for this host's internal topic
+ context.addInternal(getName(), msgQueue);
+
+ feature.beforeStart(controller);
+
+ // start consuming events from the external topic
+ externalSource.register(controller);
+
+ feature.afterStart(controller);
+ });
+ }
+
+ /**
+ * Stops the host's threads.
+ */
+ public void stop() {
+ feature.beforeStop(controller);
+ externalSource.unregister(controller);
+ feature.afterStop(controller);
+ }
+
+ /**
+ * Offers an event to the feature, before the policy controller handles it.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ return feature.beforeOffer(controller, protocol, topic2, event);
+ }
+
+ /**
+ * Offers an event to the feature, after the policy controller handles it.
+ *
+ * @param protocol
+ * @param topic
+ * @param event
+ * @param success
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
+
+ return feature.afterOffer(controller, protocol, topic, event, success);
+ }
+
+ /**
+ * Offers an event to the feature, before the drools controller handles it.
+ *
+ * @param fact
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean beforeInsert(Object fact) {
+ return feature.beforeInsert(drools, fact);
+ }
+
+ /**
+ * Offers an event to the feature, after the drools controller handles it.
+ *
+ * @param fact
+ * @param successInsert {@code true} if it was successfully inserted by the drools
+ * controller, {@code false} otherwise
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean afterInsert(Object fact, boolean successInsert) {
+ return feature.afterInsert(drools, fact, successInsert);
+ }
+
+ /**
+ * Indicates that a message was seen for this host.
+ */
+ public void sawMessage() {
+ sawMsg.set(true);
+ }
+
+ /**
+ *
+ * @return {@code true} if a message was seen for this host, {@code false}
+ * otherwise
+ */
+ public boolean messageSeen() {
+ return sawMsg.get();
+ }
+
+ /**
+ * @return the queue associated with this host's internal topic
+ */
+ public BlockingQueue<String> getInternalQueue() {
+ return msgQueue;
+ }
+ }
+
+ /**
+ * Listener for the external topic. Simulates the actions taken by
+ * <i>AggregatedPolicyController.onTopicEvent</i>.
+ */
+ private static class MyExternalTopicListener implements Answer<Void> {
+
+ private final Context context;
+ private final Host host;
+
+ public MyExternalTopicListener(Context context, Host host) {
+ this.context = context;
+ this.host = host;
+ }
+
+ @Override
+ public Void answer(InvocationOnMock args) throws Throwable {
+ int i = 0;
+ CommInfrastructure commType = args.getArgument(i++);
+ String topic = args.getArgument(i++);
+ String event = args.getArgument(i++);
+
+ if (host.beforeOffer(commType, topic, event)) {
+ return null;
+ }
+
+ boolean result;
+ Object fact = context.decodeEvent(event);
+
+ if (fact == null) {
+ result = false;
+ context.bumpDecodeErrors();
+
+ } else {
+ result = true;
+
+ if (!host.beforeInsert(fact)) {
+ // feature did not handle it so we handle it here
+ host.afterInsert(fact, result);
+
+ host.sawMessage();
+ context.addEvent();
+ }
+ }
+
+ host.afterOffer(commType, topic, event, result);
+ return null;
+ }
+ }
+
+ /**
+ * Sink implementation that puts a message on the queue specified by the
+ * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
+ * message is placed on all queues.
+ */
+ private static class TopicSinkImpl extends TopicImpl implements TopicSink {
+
+ private final Context context;
+
+ /**
+ * Used to decode the messages so that the channel can be extracted.
+ */
+ private final Serializer serializer = new Serializer();
+
+ /**
+ *
+ * @param context
+ */
+ public TopicSinkImpl(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public synchronized boolean send(String message) {
+ if (!isAlive()) {
+ return false;
+ }
+
+ try {
+ Message msg = serializer.decodeMsg(message);
+ String channel = msg.getChannel();
+
+ if (Message.ADMIN.equals(channel)) {
+ // add to every queue
+ context.offerInternal(message);
+
+ } else {
+ // add to a specific queue
+ context.offerInternal(channel, message);
+ }
+
+ return true;
+
+ } catch (IOException e) {
+ logger.warn("could not decode message: {}", message);
+ context.bumpDecodeErrors();
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Source implementation that reads from a queue associated with a topic.
+ */
+ private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
+
+ private final String topic;
+
+ /**
+ * Queue from which to retrieve messages.
+ */
+ private final BlockingQueue<String> queue;
+
+ /**
+ * Manages the current consumer thread. The "first" item is used as a trigger to
+ * tell the thread to stop processing, while the "second" item is triggered <i>by
+ * the thread</i> when it completes.
+ */
+ private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
+
+ /**
+ * Time, in milliseconds, to pause before polling for more messages.
+ */
+ private AtomicLong pauseTimeMs = new AtomicLong(0);
+
+ /**
+ *
+ * @param context
+ * @param internal {@code true} if to read from the internal topic, {@code false}
+ * to read from the external topic
+ */
+ public TopicSourceImpl(Context context, boolean internal) {
+ if (internal) {
+ Host host = context.getCurrentHost();
+
+ this.topic = INTERNAL_TOPIC;
+ this.queue = host.getInternalQueue();
+
+ context.addTopicSource(host.getName(), this);
+
+ } else {
+ this.topic = EXTERNAL_TOPIC;
+ this.queue = context.getExternalTopic();
+ }
+ }
+
+ @Override
+ public void setFilter(String filter) {
+ logger.info("topic filter set to: {}", filter);
+ }
+
+ @Override
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public boolean offer(String event) {
+ throw new UnsupportedOperationException("offer topic source");
+ }
+
+ /**
+ * Starts a thread that takes messages from the queue and gives them to the
+ * listener. Stops the thread of any previously registered listener.
+ */
+ @Override
+ public void register(TopicListener listener) {
+ Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1));
+
+ reregister(newPair);
+
+ new Thread(() -> {
+ try {
+ do {
+ processMessages(newPair.first(), listener);
+ } while (!newPair.first().await(INTER_POLL_MS, TimeUnit.MILLISECONDS));
+
+ logger.info("topic source thread completed");
+
+ } catch (InterruptedException e) {
+ logger.warn("topic source thread aborted", e);
+ Thread.currentThread().interrupt();
+
+ } catch (RuntimeException e) {
+ logger.warn("topic source thread aborted", e);
+ }
+
+ newPair.second().countDown();
+
+ }).start();
+ }
+
+ /**
+ * Stops the thread of <i>any</i> currently registered listener.
+ */
+ @Override
+ public void unregister(TopicListener listener) {
+ reregister(null);
+ }
+
+ /**
+ * Registers a new "pair" with this source, stopping the consumer associated with
+ * any previous registration.
+ *
+ * @param newPair the new "pair", or {@code null} to unregister
+ */
+ private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
+ try {
+ Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
+ if (oldPair == null) {
+ if (newPair == null) {
+ // unregister was invoked twice in a row
+ logger.warn("re-unregister for topic source");
+ }
+
+ // no previous thread to stop
+ return;
+ }
+
+ // need to stop the previous thread
+
+ // tell it to stop
+ oldPair.first().countDown();
+
+ // wait for it to stop
+ if (!oldPair.second().await(2, TimeUnit.SECONDS)) {
+ logger.warn("old topic registration is still running");
+ }
+
+ } catch (InterruptedException e) {
+ logger.warn("old topic registration may still be running", e);
+ Thread.currentThread().interrupt();
+ }
+
+ if (newPair != null) {
+ // register was invoked twice in a row
+ logger.warn("re-register for topic source");
+ }
+ }
+
+ /**
+ * Indicates that {@link #processMessages(CountDownLatch, TopicListener)} should
+ * pause a bit.
+ *
+ * @param timeMs time, in milliseconds, to pause
+ */
+ public void pause(long timeMs) {
+ pauseTimeMs.set(timeMs);
+ }
+
+ /**
+ * Polls for messages from the topic and offers them to the listener. If
+ * {@link #pauseTimeMs} is non-zero, then it pauses for the specified time and
+ * then immediately returns.
+ *
+ * @param stopped triggered if processing should stop
+ * @param listener
+ * @throws InterruptedException
+ */
+ private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
+
+ for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
+
+ long ptm = pauseTimeMs.getAndSet(0);
+ if (ptm != 0) {
+ logger.warn("pause processing");
+ stopped.await(ptm, TimeUnit.MILLISECONDS);
+ return;
+ }
+
+ String msg = queue.poll(POLL_MS, TimeUnit.MILLISECONDS);
+ if (msg == null) {
+ return;
+ }
+
+ listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
+ }
+ }
+ }
+
+ /**
+ * Topic implementation. Most methods just throw
+ * {@link UnsupportedOperationException}.
+ */
+ private static class TopicImpl implements Topic {
+
+ /**
+ * {@code True} if this topic is alive/running, {@code false} otherwise.
+ */
+ private boolean alive = false;
+
+ /**
+ *
+ */
+ public TopicImpl() {
+ super();
+ }
+
+ @Override
+ public String getTopic() {
+ return INTERNAL_TOPIC;
+ }
+
+ @Override
+ public CommInfrastructure getTopicCommInfrastructure() {
+ throw new UnsupportedOperationException("topic protocol");
+ }
+
+ @Override
+ public List<String> getServers() {
+ throw new UnsupportedOperationException("topic servers");
+ }
+
+ @Override
+ public String[] getRecentEvents() {
+ throw new UnsupportedOperationException("topic events");
+ }
+
+ @Override
+ public void register(TopicListener topicListener) {
+ throw new UnsupportedOperationException("register topic");
+ }
+
+ @Override
+ public void unregister(TopicListener topicListener) {
+ throw new UnsupportedOperationException("unregister topic");
+ }
+
+ @Override
+ public synchronized boolean start() {
+ if (alive) {
+ throw new IllegalStateException("topic already started");
+ }
+
+ alive = true;
+ return true;
+ }
+
+ @Override
+ public synchronized boolean stop() {
+ if (!alive) {
+ throw new IllegalStateException("topic is not running");
+ }
+
+ alive = false;
+ return true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ alive = false;
+ }
+
+ @Override
+ public synchronized boolean isAlive() {
+ return alive;
+ }
+
+ @Override
+ public boolean lock() {
+ throw new UnsupportedOperationException("lock topicink");
+ }
+
+ @Override
+ public boolean unlock() {
+ throw new UnsupportedOperationException("unlock topic");
+ }
+
+ @Override
+ public boolean isLocked() {
+ throw new UnsupportedOperationException("topic isLocked");
+ }
+ }
+
+ /**
+ * Simulator for the feature-level factory.
+ */
+ private static class FeatureFactory extends PoolingFeature.Factory {
+
+ private final Context context;
+
+ /**
+ *
+ * @param context
+ */
+ public FeatureFactory(Context context) {
+ this.context = context;
+
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it
+ * hasn't been fully initialized yet
+ */
+ }
+
+ @Override
+ public Properties getProperties(String featName) {
+ Properties props = new Properties();
+
+ props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
+
+ props.setProperty("pooling." + CONTROLLER1 + ".topic", INTERNAL_TOPIC);
+ props.setProperty("pooling." + CONTROLLER1 + ".enabled", "true");
+ props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.limit", "10000");
+ props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.age.milliseconds", "1000000");
+ props.setProperty("pooling." + CONTROLLER1 + ".start.heartbeat.milliseconds", "" + STD_HEARTBEAT_WAIT_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".reactivate.milliseconds", "" + STD_REACTIVATE_WAIT_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".identification.milliseconds", "" + STD_IDENTIFICATION_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".active.heartbeat.milliseconds",
+ "" + STD_ACTIVE_HEARTBEAT_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".inter.heartbeat.milliseconds", "" + STD_INTER_HEARTBEAT_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".offline.publish.wait.milliseconds",
+ "" + STD_OFFLINE_PUB_WAIT_MS);
+
+ return props;
+ }
+
+ @Override
+ public PolicyController getController(DroolsController droolsController) {
+ return context.getController(droolsController);
+ }
+ }
+
+ /**
+ * Simulator for the pooling manager factory.
+ */
+ private static class ManagerFactory extends PoolingManagerImpl.Factory {
+
+ /**
+ * Used to decode events from the external topic.
+ */
+ private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
+ @Override
+ protected ObjectMapper initialValue() {
+ return new ObjectMapper();
+ }
+ };
+
+ /**
+ * Used to decode events into a Map.
+ */
+ private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {};
+
+ /**
+ *
+ * @param context
+ */
+ public ManagerFactory(Context context) {
+
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it
+ * hasn't been fully initialized yet
+ */
+ }
+
+ @Override
+ public boolean canDecodeEvent(DroolsController drools, String topic) {
+ return true;
+ }
+
+ @Override
+ public Object decodeEvent(DroolsController drools, String topic, String event) {
+ try {
+ return mapper.get().readValue(event, typeRef);
+
+ } catch (IOException e) {
+ logger.warn("cannot decode external event", e);
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Simulator for the dmaap manager factory.
+ */
+ private static class DmaapFactory extends DmaapManager.Factory {
+
+ private final Context context;
+
+ /**
+ *
+ * @param context
+ */
+ public DmaapFactory(Context context) {
+ this.context = context;
+
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it
+ * hasn't been fully initialized yet
+ */
+ }
+
+ @Override
+ public List<TopicSource> initTopicSources(Properties props) {
+ return Arrays.asList(new TopicSourceImpl(context, true));
+ }
+
+ @Override
+ public List<TopicSink> initTopicSinks(Properties props) {
+ return Arrays.asList(new TopicSinkImpl(context));
+ }
+ }
+
+ /**
+ * Controller that also implements the {@link TopicListener} interface.
+ */
+ private static interface ListenerController extends PolicyController, TopicListener {
+
+ }
+
+ /**
+ * Simple function that takes no arguments and returns nothing.
+ */
+ @FunctionalInterface
+ private static interface VoidFunction {
+
+ public void apply();
+ }
+}