/* * ============LICENSE_START======================================================= * ONAP * ================================================================================ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.policy.drools.pooling; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX; import com.google.gson.Gson; import com.google.gson.JsonParseException; import java.util.Arrays; import java.util.Deque; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the * following are not:
PolicyEngine, PolicyController, DroolsController
mocked
*
* *

The following fields must be set before executing this:

*/ public class EndToEndFeatureTest { private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class); /** * KAFKA servers for both internal & external topics. */ private static final String SERVER = "localhost:9092"; /** * Name of the topic used for inter-host communication. */ private static final String INTERNAL_TOPIC = "internal-topic"; /** * Name of the topic from which "external" events "arrive". */ private static final String EXTERNAL_TOPIC = "external-topic"; /** * Consumer group to use when polling the external topic. */ private static final String EXTERNAL_GROUP = EndToEndFeatureTest.class.getName(); /** * Name of the controller. */ private static final String CONTROLLER1 = "controller.one"; /** * Maximum number of items to fetch from Kafka in a single poll. */ private static final String FETCH_LIMIT = "5"; private static final long STD_REACTIVATE_WAIT_MS = 10000; private static final long STD_IDENTIFICATION_MS = 10000; private static final long STD_START_HEARTBEAT_MS = 15000; private static final long STD_ACTIVE_HEARTBEAT_MS = 12000; private static final long STD_INTER_HEARTBEAT_MS = 5000; private static final long STD_OFFLINE_PUB_WAIT_MS = 2; private static final long EVENT_WAIT_SEC = 15; /** * Used to decode events from the external topic. */ private static final Gson mapper = new Gson(); /** * Used to identify the current host. */ private static final ThreadLocal currentHost = new ThreadLocal(); /** * Sink for external Kafka topic. */ private static TopicSink externalSink; /** * Sink for internal Kafka topic. */ private static TopicSink internalSink; /** * Context for the current test case. */ private Context ctx; /** * Setup before class. * */ @BeforeAll public static void setUpBeforeClass() { externalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0); externalSink.start(); internalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0); internalSink.start(); } /** * Tear down after class. * */ @AfterAll public static void tearDownAfterClass() { externalSink.stop(); internalSink.stop(); } /** * Setup. */ @BeforeEach public void setUp() { ctx = null; } /** * Tear down. */ @AfterEach public void tearDown() { if (ctx != null) { ctx.destroy(); } } /* * This test should only be run manually, after configuring all the fields, * thus it is ignored. */ @Disabled @Test public void test_SingleHost() throws Exception { // NOSONAR run(70, 1); } /* * This test should only be run manually, after configuring all the fields, * thus it is ignored. */ @Disabled @Test public void test_TwoHosts() throws Exception { // NOSONAR run(200, 2); } /* * This test should only be run manually, after configuring all the fields, * thus it is ignored. */ @Disabled @Test public void test_ThreeHosts() throws Exception { // NOSONAR run(200, 3); } private void run(int nmessages, int nhosts) throws Exception { ctx = new Context(nmessages); for (int x = 0; x < nhosts; ++x) { ctx.addHost(); } ctx.startHosts(); ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2); for (int x = 0; x < nmessages; ++x) { ctx.offerExternal(makeMessage(x)); } ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS); assertEquals(0, ctx.getDecodeErrors()); assertEquals(0, ctx.getRemainingEvents()); ctx.checkAllSawAMsg(); } private String makeMessage(int reqnum) { return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}"; } private static Properties makeSinkProperties(String topic) { Properties props = new Properties(); props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS, topic); props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0"); props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); return props; } private static Properties makeSourceProperties(String topic) { Properties props = new Properties(); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); if (EXTERNAL_TOPIC.equals(topic)) { // consumer group is a constant props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP); // consumer instance is generated by the BusConsumer code } // else internal topic: feature populates info for internal topic return props; } /** * Decodes an event. * * @param event event * @return the decoded event, or {@code null} if it cannot be decoded */ private static Object decodeEvent(String event) { try { return mapper.fromJson(event, TreeMap.class); } catch (JsonParseException e) { logger.warn("cannot decode external event", e); return null; } } /** * Context used for a single test case. */ private static class Context { /** * Hosts that have been added to this context. */ private final Deque hosts = new LinkedList<>(); /** * Maps a drools controller to its policy controller. */ private final IdentityHashMap drools2policy = new IdentityHashMap<>(); /** * Counts the number of decode errors. */ private final AtomicInteger decodeErrors = new AtomicInteger(0); /** * Number of events we're still waiting to receive. */ private final CountDownLatch eventCounter; /** * Constructor. * * @param events number of events to be processed */ public Context(int events) { eventCounter = new CountDownLatch(events); } /** * Destroys the context, stopping any hosts that remain. */ public void destroy() { stopHosts(); hosts.clear(); } /** * Creates and adds a new host to the context. * * @return the new Host */ public Host addHost() { Host host = new Host(this); hosts.add(host); return host; } /** * Starts the hosts. */ public void startHosts() { hosts.forEach(Host::start); } /** * Stops the hosts. */ public void stopHosts() { hosts.forEach(Host::stop); } /** * Verifies that all hosts processed at least one message. */ public void checkAllSawAMsg() { int msgs = 0; for (Host host : hosts) { assertTrue(host.messageSeen(), "msgs=" + msgs); ++msgs; } } /** * Offers an event to the external topic. * * @param event event */ public void offerExternal(String event) { externalSink.send(event); } /** * Associates a controller with its drools controller. * * @param controller controller * @param droolsController drools controller */ public void addController(PolicyController controller, DroolsController droolsController) { drools2policy.put(droolsController, controller); } /** * Get controller. * * @param droolsController drools controller * @return the controller associated with a drools controller, or {@code null} if it has no * associated controller */ public PolicyController getController(DroolsController droolsController) { return drools2policy.get(droolsController); } /** * Get decode errors. * * @return the number of decode errors so far */ public int getDecodeErrors() { return decodeErrors.get(); } /** * Increments the count of decode errors. */ public void bumpDecodeErrors() { decodeErrors.incrementAndGet(); } /** * Get remaining events. * * @return the number of events that haven't been processed */ public long getRemainingEvents() { return eventCounter.getCount(); } /** * Adds an event to the counter. */ public void addEvent() { eventCounter.countDown(); } /** * Waits, for a period of time, for all events to be processed. * * @param time time * @param units units * @return {@code true} if all events have been processed, {@code false} otherwise * @throws InterruptedException throws interrupted exception */ public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException { return eventCounter.await(time, units); } /** * Waits, for a period of time, for all hosts to enter the Active state. * * @param timeMs maximum time to wait, in milliseconds * @throws InterruptedException throws interrupted exception */ public void awaitAllActive(long timeMs) throws InterruptedException { long tend = timeMs + System.currentTimeMillis(); for (Host host : hosts) { long tremain = Math.max(0, tend - System.currentTimeMillis()); assertTrue(host.awaitActive(tremain)); } } } /** * Simulates a single "host". */ private static class Host { private final PoolingFeature feature; /** * {@code True} if this host has processed a message, {@code false} otherwise. */ private final AtomicBoolean sawMsg = new AtomicBoolean(false); private final TopicSource externalSource; private final TopicSource internalSource; // mock objects private final PolicyEngine engine = mock(PolicyEngine.class); private final ListenerController controller = mock(ListenerController.class); private final DroolsController drools = mock(DroolsController.class); /** * Constructor. * * @param context context */ public Host(Context context) { when(controller.getName()).thenReturn(CONTROLLER1); when(controller.getDrools()).thenReturn(drools); externalSource = TopicEndpointManager.getManager() .addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0); internalSource = TopicEndpointManager.getManager() .addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0); // stop consuming events if the controller stops when(controller.stop()).thenAnswer(args -> { externalSource.unregister(controller); return true; }); doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any()); context.addController(controller, drools); feature = new PoolingFeatureImpl(context, this); } /** * Waits, for a period of time, for the host to enter the Active state. * * @param timeMs time to wait, in milliseconds * @return {@code true} if the host entered the Active state within the given amount of * time, {@code false} otherwise * @throws InterruptedException throws interrupted exception */ public boolean awaitActive(long timeMs) throws InterruptedException { return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS); } /** * Starts threads for the host so that it begins consuming from both the external "DMaaP" * topic and its own internal "DMaaP" topic. */ public void start() { feature.beforeStart(engine); feature.afterCreate(controller); feature.beforeStart(controller); // start consuming events from the external topic externalSource.register(controller); feature.afterStart(controller); } /** * Stops the host's threads. */ public void stop() { feature.beforeStop(controller); externalSource.unregister(controller); feature.afterStop(controller); } /** * Offers an event to the feature, before the policy controller handles it. * * @param protocol protocol * @param topic2 topic * @param event event * @return {@code true} if the event was handled, {@code false} otherwise */ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { return feature.beforeOffer(controller, protocol, topic2, event); } /** * Offers an event to the feature, after the policy controller handles it. * * @param protocol protocol * @param topic topic * @param event event * @param success success * @return {@code true} if the event was handled, {@code false} otherwise */ public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) { return feature.afterOffer(controller, protocol, topic, event, success); } /** * Offers an event to the feature, before the drools controller handles it. * * @param fact fact * @return {@code true} if the event was handled, {@code false} otherwise */ public boolean beforeInsert(Object fact) { return feature.beforeInsert(drools, fact); } /** * Offers an event to the feature, after the drools controller handles it. * * @param fact fact * @param successInsert {@code true} if it was successfully inserted by the drools * controller, {@code false} otherwise * @return {@code true} if the event was handled, {@code false} otherwise */ public boolean afterInsert(Object fact, boolean successInsert) { return feature.afterInsert(drools, fact, successInsert); } /** * Indicates that a message was seen for this host. */ public void sawMessage() { sawMsg.set(true); } /** * Message seen. * * @return {@code true} if a message was seen for this host, {@code false} otherwise */ public boolean messageSeen() { return sawMsg.get(); } } /** * Listener for the external topic. Simulates the actions taken by * AggregatedPolicyController.onTopicEvent. */ private static class MyExternalTopicListener implements Answer { private final Context context; private final Host host; public MyExternalTopicListener(Context context, Host host) { this.context = context; this.host = host; } @Override public Void answer(InvocationOnMock args) throws Throwable { int index = 0; CommInfrastructure commType = args.getArgument(index++); String topic = args.getArgument(index++); String event = args.getArgument(index++); if (host.beforeOffer(commType, topic, event)) { return null; } boolean result; Object fact = decodeEvent(event); if (fact == null) { result = false; context.bumpDecodeErrors(); } else { result = true; if (!host.beforeInsert(fact)) { // feature did not handle it so we handle it here host.afterInsert(fact, true); host.sawMessage(); context.addEvent(); } } host.afterOffer(commType, topic, event, result); return null; } } /** * Feature with overrides. */ private static class PoolingFeatureImpl extends PoolingFeature { private final Context context; private final Host host; /** * Constructor. * * @param context context */ public PoolingFeatureImpl(Context context, Host host) { this.context = context; this.host = host; /* * Note: do NOT extract anything from "context" at this point, because it hasn't been * fully initialized yet */ } @Override public Properties getProperties(String featName) { Properties props = new Properties(); props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}"); props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true"); props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC); props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000"); props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000"); props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1), "" + STD_OFFLINE_PUB_WAIT_MS); props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1), "" + STD_START_HEARTBEAT_MS); props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS); props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS); props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1), "" + STD_ACTIVE_HEARTBEAT_MS); props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), "" + STD_INTER_HEARTBEAT_MS); props.putAll(makeSinkProperties(INTERNAL_TOPIC)); props.putAll(makeSourceProperties(INTERNAL_TOPIC)); return props; } @Override public PolicyController getController(DroolsController droolsController) { return context.getController(droolsController); } /** * Embeds a specializer within a property name, after the prefix. * * @param propnm property name into which it should be embedded * @param spec specializer to be embedded * @return the property name, with the specializer embedded within it */ private String specialize(String propnm, String spec) { String suffix = propnm.substring(PREFIX.length()); return PREFIX + spec + "." + suffix; } @Override protected PoolingManagerImpl makeManager(String hostName, PolicyController controller, PoolingProperties props, CountDownLatch activeLatch) { /* * Set this before creating the test, because the test's superclass * constructor uses it before the test object has a chance to store it. */ currentHost.set(host); return new PoolingManagerTest(hostName, controller, props, activeLatch); } } /** * Pooling Manager with overrides. */ private static class PoolingManagerTest extends PoolingManagerImpl { /** * Constructor. * * @param hostName the host * @param controller the controller * @param props the properties * @param activeLatch the latch */ public PoolingManagerTest(String hostName, PolicyController controller, PoolingProperties props, CountDownLatch activeLatch) { super(hostName, controller, props, activeLatch); } @Override protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException { return new TopicMessageManagerImpl(topic); } @Override protected boolean canDecodeEvent(DroolsController drools, String topic) { return true; } @Override protected Object decodeEventWrapper(DroolsController drools, String topic, String event) { return decodeEvent(event); } } /** * DMaaP Manager with overrides. */ private static class TopicMessageManagerImpl extends TopicMessageManager { /** * Constructor. * * @param topic the topic * @throws PoolingFeatureException if an error occurs */ public TopicMessageManagerImpl(String topic) throws PoolingFeatureException { super(topic); } @Override protected List getTopicSources() { Host host = currentHost.get(); return Arrays.asList(host.internalSource, host.externalSource); } @Override protected List getTopicSinks() { return Arrays.asList(internalSink, externalSink); } } /** * Controller that also implements the {@link TopicListener} interface. */ private static interface ListenerController extends PolicyController, TopicListener { } }