diff options
Diffstat (limited to 'feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java')
-rw-r--r-- | feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java | 810 |
1 files changed, 810 insertions, 0 deletions
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java new file mode 100644 index 00000000..31ad207c --- /dev/null +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java @@ -0,0 +1,810 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX; + +import com.google.gson.Gson; +import com.google.gson.JsonParseException; +import java.util.Arrays; +import java.util.Deque; +import java.util.IdentityHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own + * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the + * following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd> + * </dl> + * + * <p>The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li> + * <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul> + */ +public class EndToEndFeatureTest { + + private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class); + + /** + * UEB servers for both internal & external topics. + */ + private static final String UEB_SERVERS = "ueb-server"; + + /** + * Name of the topic used for inter-host communication. + */ + private static final String INTERNAL_TOPIC = "internal-topic"; + + /** + * Name of the topic from which "external" events "arrive". + */ + private static final String EXTERNAL_TOPIC = "external-topic"; + + /** + * Consumer group to use when polling the external topic. + */ + private static final String EXTERNAL_GROUP = EndToEndFeatureTest.class.getName(); + + /** + * Name of the controller. + */ + private static final String CONTROLLER1 = "controller.one"; + + /** + * Maximum number of items to fetch from DMaaP in a single poll. + */ + private static final String FETCH_LIMIT = "5"; + + private static final long STD_REACTIVATE_WAIT_MS = 10000; + private static final long STD_IDENTIFICATION_MS = 10000; + private static final long STD_START_HEARTBEAT_MS = 15000; + private static final long STD_ACTIVE_HEARTBEAT_MS = 12000; + private static final long STD_INTER_HEARTBEAT_MS = 5000; + private static final long STD_OFFLINE_PUB_WAIT_MS = 2; + private static final long EVENT_WAIT_SEC = 15; + + /** + * Used to decode events from the external topic. + */ + private static final Gson mapper = new Gson(); + + /** + * Used to identify the current host. + */ + private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>(); + + /** + * Sink for external DMaaP topic. + */ + private static TopicSink externalSink; + + /** + * Sink for internal DMaaP topic. + */ + private static TopicSink internalSink; + + /** + * Context for the current test case. + */ + private Context ctx; + + /** + * Setup before class. + * + */ + @BeforeAll + public static void setUpBeforeClass() { + externalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0); + externalSink.start(); + + internalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0); + internalSink.start(); + } + + /** + * Tear down after class. + * + */ + @AfterAll + public static void tearDownAfterClass() { + externalSink.stop(); + internalSink.stop(); + } + + /** + * Setup. + */ + @BeforeEach + public void setUp() { + ctx = null; + } + + /** + * Tear down. + */ + @AfterEach + public void tearDown() { + if (ctx != null) { + ctx.destroy(); + } + } + + /* + * This test should only be run manually, after configuring all the fields, + * thus it is ignored. + */ + @Disabled + @Test + public void test_SingleHost() throws Exception { // NOSONAR + run(70, 1); + } + + /* + * This test should only be run manually, after configuring all the fields, + * thus it is ignored. + */ + @Disabled + @Test + public void test_TwoHosts() throws Exception { // NOSONAR + run(200, 2); + } + + /* + * This test should only be run manually, after configuring all the fields, + * thus it is ignored. + */ + @Disabled + @Test + public void test_ThreeHosts() throws Exception { // NOSONAR + run(200, 3); + } + + private void run(int nmessages, int nhosts) throws Exception { + ctx = new Context(nmessages); + + for (int x = 0; x < nhosts; ++x) { + ctx.addHost(); + } + + ctx.startHosts(); + ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2); + + for (int x = 0; x < nmessages; ++x) { + ctx.offerExternal(makeMessage(x)); + } + + ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS); + + assertEquals(0, ctx.getDecodeErrors()); + assertEquals(0, ctx.getRemainingEvents()); + ctx.checkAllSawAMsg(); + } + + private String makeMessage(int reqnum) { + return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}"; + } + + private static Properties makeSinkProperties(String topic) { + Properties props = new Properties(); + + props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic); + + props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS); + props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0"); + props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); + + return props; + } + + private static Properties makeSourceProperties(String topic) { + Properties props = new Properties(); + + props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic); + + props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS); + props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT); + props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); + + if (EXTERNAL_TOPIC.equals(topic)) { + // consumer group is a constant + props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP); + + // consumer instance is generated by the BusConsumer code + } + + // else internal topic: feature populates info for internal topic + + return props; + } + + /** + * Decodes an event. + * + * @param event event + * @return the decoded event, or {@code null} if it cannot be decoded + */ + private static Object decodeEvent(String event) { + try { + return mapper.fromJson(event, TreeMap.class); + + } catch (JsonParseException e) { + logger.warn("cannot decode external event", e); + return null; + } + } + + /** + * Context used for a single test case. + */ + private static class Context { + + /** + * Hosts that have been added to this context. + */ + private final Deque<Host> hosts = new LinkedList<>(); + + /** + * Maps a drools controller to its policy controller. + */ + private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>(); + + /** + * Counts the number of decode errors. + */ + private final AtomicInteger decodeErrors = new AtomicInteger(0); + + /** + * Number of events we're still waiting to receive. + */ + private final CountDownLatch eventCounter; + + /** + * Constructor. + * + * @param events number of events to be processed + */ + public Context(int events) { + eventCounter = new CountDownLatch(events); + } + + /** + * Destroys the context, stopping any hosts that remain. + */ + public void destroy() { + stopHosts(); + hosts.clear(); + } + + /** + * Creates and adds a new host to the context. + * + * @return the new Host + */ + public Host addHost() { + Host host = new Host(this); + hosts.add(host); + + return host; + } + + /** + * Starts the hosts. + */ + public void startHosts() { + hosts.forEach(host -> host.start()); + } + + /** + * Stops the hosts. + */ + public void stopHosts() { + hosts.forEach(host -> host.stop()); + } + + /** + * Verifies that all hosts processed at least one message. + */ + public void checkAllSawAMsg() { + int msgs = 0; + for (Host host : hosts) { + assertTrue(host.messageSeen(), "msgs=" + msgs); + ++msgs; + } + } + + /** + * Offers an event to the external topic. + * + * @param event event + */ + public void offerExternal(String event) { + externalSink.send(event); + } + + /** + * Associates a controller with its drools controller. + * + * @param controller controller + * @param droolsController drools controller + */ + public void addController(PolicyController controller, DroolsController droolsController) { + drools2policy.put(droolsController, controller); + } + + /** + * Get controller. + * + * @param droolsController drools controller + * @return the controller associated with a drools controller, or {@code null} if it has no + * associated controller + */ + public PolicyController getController(DroolsController droolsController) { + return drools2policy.get(droolsController); + } + + /** + * Get decode errors. + * + * @return the number of decode errors so far + */ + public int getDecodeErrors() { + return decodeErrors.get(); + } + + /** + * Increments the count of decode errors. + */ + public void bumpDecodeErrors() { + decodeErrors.incrementAndGet(); + } + + /** + * Get remaining events. + * + * @return the number of events that haven't been processed + */ + public long getRemainingEvents() { + return eventCounter.getCount(); + } + + /** + * Adds an event to the counter. + */ + public void addEvent() { + eventCounter.countDown(); + } + + /** + * Waits, for a period of time, for all events to be processed. + * + * @param time time + * @param units units + * @return {@code true} if all events have been processed, {@code false} otherwise + * @throws InterruptedException throws interrupted exception + */ + public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException { + return eventCounter.await(time, units); + } + + /** + * Waits, for a period of time, for all hosts to enter the Active state. + * + * @param timeMs maximum time to wait, in milliseconds + * @throws InterruptedException throws interrupted exception + */ + public void awaitAllActive(long timeMs) throws InterruptedException { + long tend = timeMs + System.currentTimeMillis(); + + for (Host host : hosts) { + long tremain = Math.max(0, tend - System.currentTimeMillis()); + assertTrue(host.awaitActive(tremain)); + } + } + } + + /** + * Simulates a single "host". + */ + private static class Host { + + private final PoolingFeature feature; + + /** + * {@code True} if this host has processed a message, {@code false} otherwise. + */ + private final AtomicBoolean sawMsg = new AtomicBoolean(false); + + private final TopicSource externalSource; + private final TopicSource internalSource; + + // mock objects + private final PolicyEngine engine = mock(PolicyEngine.class); + private final ListenerController controller = mock(ListenerController.class); + private final DroolsController drools = mock(DroolsController.class); + + /** + * Constructor. + * + * @param context context + */ + public Host(Context context) { + + when(controller.getName()).thenReturn(CONTROLLER1); + when(controller.getDrools()).thenReturn(drools); + + externalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)) + .get(0); + internalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(INTERNAL_TOPIC)) + .get(0); + + // stop consuming events if the controller stops + when(controller.stop()).thenAnswer(args -> { + externalSource.unregister(controller); + return true; + }); + + doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any()); + + context.addController(controller, drools); + + feature = new PoolingFeatureImpl(context, this); + } + + /** + * Waits, for a period of time, for the host to enter the Active state. + * + * @param timeMs time to wait, in milliseconds + * @return {@code true} if the host entered the Active state within the given amount of + * time, {@code false} otherwise + * @throws InterruptedException throws interrupted exception + */ + public boolean awaitActive(long timeMs) throws InterruptedException { + return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS); + } + + /** + * Starts threads for the host so that it begins consuming from both the external "DMaaP" + * topic and its own internal "DMaaP" topic. + */ + public void start() { + feature.beforeStart(engine); + feature.afterCreate(controller); + + feature.beforeStart(controller); + + // start consuming events from the external topic + externalSource.register(controller); + + feature.afterStart(controller); + } + + /** + * Stops the host's threads. + */ + public void stop() { + feature.beforeStop(controller); + externalSource.unregister(controller); + feature.afterStop(controller); + } + + /** + * Offers an event to the feature, before the policy controller handles it. + * + * @param protocol protocol + * @param topic2 topic + * @param event event + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { + return feature.beforeOffer(controller, protocol, topic2, event); + } + + /** + * Offers an event to the feature, after the policy controller handles it. + * + * @param protocol protocol + * @param topic topic + * @param event event + * @param success success + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) { + + return feature.afterOffer(controller, protocol, topic, event, success); + } + + /** + * Offers an event to the feature, before the drools controller handles it. + * + * @param fact fact + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean beforeInsert(Object fact) { + return feature.beforeInsert(drools, fact); + } + + /** + * Offers an event to the feature, after the drools controller handles it. + * + * @param fact fact + * @param successInsert {@code true} if it was successfully inserted by the drools + * controller, {@code false} otherwise + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean afterInsert(Object fact, boolean successInsert) { + return feature.afterInsert(drools, fact, successInsert); + } + + /** + * Indicates that a message was seen for this host. + */ + public void sawMessage() { + sawMsg.set(true); + } + + /** + * Message seen. + * + * @return {@code true} if a message was seen for this host, {@code false} otherwise + */ + public boolean messageSeen() { + return sawMsg.get(); + } + } + + /** + * Listener for the external topic. Simulates the actions taken by + * <i>AggregatedPolicyController.onTopicEvent</i>. + */ + private static class MyExternalTopicListener implements Answer<Void> { + + private final Context context; + private final Host host; + + public MyExternalTopicListener(Context context, Host host) { + this.context = context; + this.host = host; + } + + @Override + public Void answer(InvocationOnMock args) throws Throwable { + int index = 0; + CommInfrastructure commType = args.getArgument(index++); + String topic = args.getArgument(index++); + String event = args.getArgument(index++); + + if (host.beforeOffer(commType, topic, event)) { + return null; + } + + boolean result; + Object fact = decodeEvent(event); + + if (fact == null) { + result = false; + context.bumpDecodeErrors(); + + } else { + result = true; + + if (!host.beforeInsert(fact)) { + // feature did not handle it so we handle it here + host.afterInsert(fact, result); + + host.sawMessage(); + context.addEvent(); + } + } + + host.afterOffer(commType, topic, event, result); + return null; + } + } + + /** + * Feature with overrides. + */ + private static class PoolingFeatureImpl extends PoolingFeature { + + private final Context context; + private final Host host; + + /** + * Constructor. + * + * @param context context + */ + public PoolingFeatureImpl(Context context, Host host) { + this.context = context; + this.host = host; + + /* + * Note: do NOT extract anything from "context" at this point, because it hasn't been + * fully initialized yet + */ + } + + @Override + public Properties getProperties(String featName) { + Properties props = new Properties(); + + props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}"); + + props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true"); + props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC); + props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000"); + props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000"); + props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1), + "" + STD_OFFLINE_PUB_WAIT_MS); + props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1), + "" + STD_START_HEARTBEAT_MS); + props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS); + props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS); + props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1), + "" + STD_ACTIVE_HEARTBEAT_MS); + props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), + "" + STD_INTER_HEARTBEAT_MS); + + props.putAll(makeSinkProperties(INTERNAL_TOPIC)); + props.putAll(makeSourceProperties(INTERNAL_TOPIC)); + + return props; + } + + @Override + public PolicyController getController(DroolsController droolsController) { + return context.getController(droolsController); + } + + /** + * Embeds a specializer within a property name, after the prefix. + * + * @param propnm property name into which it should be embedded + * @param spec specializer to be embedded + * @return the property name, with the specializer embedded within it + */ + private String specialize(String propnm, String spec) { + String suffix = propnm.substring(PREFIX.length()); + return PREFIX + spec + "." + suffix; + } + + @Override + protected PoolingManagerImpl makeManager(String hostName, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { + + /* + * Set this before creating the test, because the test's superclass + * constructor uses it before the test object has a chance to store it. + */ + currentHost.set(host); + + return new PoolingManagerTest(hostName, controller, props, activeLatch); + } + } + + /** + * Pooling Manager with overrides. + */ + private static class PoolingManagerTest extends PoolingManagerImpl { + + /** + * Constructor. + * + * @param hostName the host + * @param controller the controller + * @param props the properties + * @param activeLatch the latch + */ + public PoolingManagerTest(String hostName, PolicyController controller, + PoolingProperties props, CountDownLatch activeLatch) { + + super(hostName, controller, props, activeLatch); + } + + @Override + protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException { + return new TopicMessageManagerImpl(topic); + } + + @Override + protected boolean canDecodeEvent(DroolsController drools, String topic) { + return true; + } + + @Override + protected Object decodeEventWrapper(DroolsController drools, String topic, String event) { + return decodeEvent(event); + } + } + + /** + * DMaaP Manager with overrides. + */ + private static class TopicMessageManagerImpl extends TopicMessageManager { + + /** + * Constructor. + * + * @param topic the topic + * @throws PoolingFeatureException if an error occurs + */ + public TopicMessageManagerImpl(String topic) throws PoolingFeatureException { + super(topic); + } + + @Override + protected List<TopicSource> getTopicSources() { + Host host = currentHost.get(); + return Arrays.asList(host.internalSource, host.externalSource); + } + + @Override + protected List<TopicSink> getTopicSinks() { + return Arrays.asList(internalSink, externalSink); + } + } + + /** + * Controller that also implements the {@link TopicListener} interface. + */ + private static interface ListenerController extends PolicyController, TopicListener { + + } +} |