/*
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
* Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.policy.drools.pooling;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import java.util.Arrays;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
* feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
* following are not:
- PolicyEngine, PolicyController, DroolsController
- mocked
*
*
* The following fields must be set before executing this:
- UEB_SERVERS
* - INTERNAL_TOPIC
- EXTERNAL_TOPIC
*/
public class EndToEndFeatureTest {
private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class);
/**
* KAFKA servers for both internal & external topics.
*/
private static final String SERVER = "localhost:9092";
/**
* Name of the topic used for inter-host communication.
*/
private static final String INTERNAL_TOPIC = "internal-topic";
/**
* Name of the topic from which "external" events "arrive".
*/
private static final String EXTERNAL_TOPIC = "external-topic";
/**
* Consumer group to use when polling the external topic.
*/
private static final String EXTERNAL_GROUP = EndToEndFeatureTest.class.getName();
/**
* Name of the controller.
*/
private static final String CONTROLLER1 = "controller.one";
/**
* Maximum number of items to fetch from Kafka in a single poll.
*/
private static final String FETCH_LIMIT = "5";
private static final long STD_REACTIVATE_WAIT_MS = 10000;
private static final long STD_IDENTIFICATION_MS = 10000;
private static final long STD_START_HEARTBEAT_MS = 15000;
private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
private static final long STD_INTER_HEARTBEAT_MS = 5000;
private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
private static final long EVENT_WAIT_SEC = 15;
/**
* Used to decode events from the external topic.
*/
private static final Gson mapper = new Gson();
/**
* Used to identify the current host.
*/
private static final ThreadLocal currentHost = new ThreadLocal();
/**
* Sink for external Kafka topic.
*/
private static TopicSink externalSink;
/**
* Sink for internal Kafka topic.
*/
private static TopicSink internalSink;
/**
* Context for the current test case.
*/
private Context ctx;
/**
* Setup before class.
*
*/
@BeforeAll
public static void setUpBeforeClass() {
externalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
externalSink.start();
internalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
internalSink.start();
}
/**
* Tear down after class.
*
*/
@AfterAll
public static void tearDownAfterClass() {
externalSink.stop();
internalSink.stop();
}
/**
* Setup.
*/
@BeforeEach
public void setUp() {
ctx = null;
}
/**
* Tear down.
*/
@AfterEach
public void tearDown() {
if (ctx != null) {
ctx.destroy();
}
}
/*
* This test should only be run manually, after configuring all the fields,
* thus it is ignored.
*/
@Disabled
@Test
public void test_SingleHost() throws Exception { // NOSONAR
run(70, 1);
}
/*
* This test should only be run manually, after configuring all the fields,
* thus it is ignored.
*/
@Disabled
@Test
public void test_TwoHosts() throws Exception { // NOSONAR
run(200, 2);
}
/*
* This test should only be run manually, after configuring all the fields,
* thus it is ignored.
*/
@Disabled
@Test
public void test_ThreeHosts() throws Exception { // NOSONAR
run(200, 3);
}
private void run(int nmessages, int nhosts) throws Exception {
ctx = new Context(nmessages);
for (int x = 0; x < nhosts; ++x) {
ctx.addHost();
}
ctx.startHosts();
ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
for (int x = 0; x < nmessages; ++x) {
ctx.offerExternal(makeMessage(x));
}
ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
assertEquals(0, ctx.getDecodeErrors());
assertEquals(0, ctx.getRemainingEvents());
ctx.checkAllSawAMsg();
}
private String makeMessage(int reqnum) {
return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
}
private static Properties makeSinkProperties(String topic) {
Properties props = new Properties();
props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS, topic);
props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
return props;
}
private static Properties makeSourceProperties(String topic) {
Properties props = new Properties();
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
if (EXTERNAL_TOPIC.equals(topic)) {
// consumer group is a constant
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
// consumer instance is generated by the BusConsumer code
}
// else internal topic: feature populates info for internal topic
return props;
}
/**
* Decodes an event.
*
* @param event event
* @return the decoded event, or {@code null} if it cannot be decoded
*/
private static Object decodeEvent(String event) {
try {
return mapper.fromJson(event, TreeMap.class);
} catch (JsonParseException e) {
logger.warn("cannot decode external event", e);
return null;
}
}
/**
* Context used for a single test case.
*/
private static class Context {
/**
* Hosts that have been added to this context.
*/
private final Deque hosts = new LinkedList<>();
/**
* Maps a drools controller to its policy controller.
*/
private final IdentityHashMap drools2policy = new IdentityHashMap<>();
/**
* Counts the number of decode errors.
*/
private final AtomicInteger decodeErrors = new AtomicInteger(0);
/**
* Number of events we're still waiting to receive.
*/
private final CountDownLatch eventCounter;
/**
* Constructor.
*
* @param events number of events to be processed
*/
public Context(int events) {
eventCounter = new CountDownLatch(events);
}
/**
* Destroys the context, stopping any hosts that remain.
*/
public void destroy() {
stopHosts();
hosts.clear();
}
/**
* Creates and adds a new host to the context.
*
* @return the new Host
*/
public Host addHost() {
Host host = new Host(this);
hosts.add(host);
return host;
}
/**
* Starts the hosts.
*/
public void startHosts() {
hosts.forEach(Host::start);
}
/**
* Stops the hosts.
*/
public void stopHosts() {
hosts.forEach(Host::stop);
}
/**
* Verifies that all hosts processed at least one message.
*/
public void checkAllSawAMsg() {
int msgs = 0;
for (Host host : hosts) {
assertTrue(host.messageSeen(), "msgs=" + msgs);
++msgs;
}
}
/**
* Offers an event to the external topic.
*
* @param event event
*/
public void offerExternal(String event) {
externalSink.send(event);
}
/**
* Associates a controller with its drools controller.
*
* @param controller controller
* @param droolsController drools controller
*/
public void addController(PolicyController controller, DroolsController droolsController) {
drools2policy.put(droolsController, controller);
}
/**
* Get controller.
*
* @param droolsController drools controller
* @return the controller associated with a drools controller, or {@code null} if it has no
* associated controller
*/
public PolicyController getController(DroolsController droolsController) {
return drools2policy.get(droolsController);
}
/**
* Get decode errors.
*
* @return the number of decode errors so far
*/
public int getDecodeErrors() {
return decodeErrors.get();
}
/**
* Increments the count of decode errors.
*/
public void bumpDecodeErrors() {
decodeErrors.incrementAndGet();
}
/**
* Get remaining events.
*
* @return the number of events that haven't been processed
*/
public long getRemainingEvents() {
return eventCounter.getCount();
}
/**
* Adds an event to the counter.
*/
public void addEvent() {
eventCounter.countDown();
}
/**
* Waits, for a period of time, for all events to be processed.
*
* @param time time
* @param units units
* @return {@code true} if all events have been processed, {@code false} otherwise
* @throws InterruptedException throws interrupted exception
*/
public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
return eventCounter.await(time, units);
}
/**
* Waits, for a period of time, for all hosts to enter the Active state.
*
* @param timeMs maximum time to wait, in milliseconds
* @throws InterruptedException throws interrupted exception
*/
public void awaitAllActive(long timeMs) throws InterruptedException {
long tend = timeMs + System.currentTimeMillis();
for (Host host : hosts) {
long tremain = Math.max(0, tend - System.currentTimeMillis());
assertTrue(host.awaitActive(tremain));
}
}
}
/**
* Simulates a single "host".
*/
private static class Host {
private final PoolingFeature feature;
/**
* {@code True} if this host has processed a message, {@code false} otherwise.
*/
private final AtomicBoolean sawMsg = new AtomicBoolean(false);
private final TopicSource externalSource;
private final TopicSource internalSource;
// mock objects
private final PolicyEngine engine = mock(PolicyEngine.class);
private final ListenerController controller = mock(ListenerController.class);
private final DroolsController drools = mock(DroolsController.class);
/**
* Constructor.
*
* @param context context
*/
public Host(Context context) {
when(controller.getName()).thenReturn(CONTROLLER1);
when(controller.getDrools()).thenReturn(drools);
externalSource = TopicEndpointManager.getManager()
.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
internalSource = TopicEndpointManager.getManager()
.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
// stop consuming events if the controller stops
when(controller.stop()).thenAnswer(args -> {
externalSource.unregister(controller);
return true;
});
doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
context.addController(controller, drools);
feature = new PoolingFeatureImpl(context, this);
}
/**
* Waits, for a period of time, for the host to enter the Active state.
*
* @param timeMs time to wait, in milliseconds
* @return {@code true} if the host entered the Active state within the given amount of
* time, {@code false} otherwise
* @throws InterruptedException throws interrupted exception
*/
public boolean awaitActive(long timeMs) throws InterruptedException {
return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
}
/**
* Starts threads for the host so that it begins consuming from both the external "DMaaP"
* topic and its own internal "DMaaP" topic.
*/
public void start() {
feature.beforeStart(engine);
feature.afterCreate(controller);
feature.beforeStart(controller);
// start consuming events from the external topic
externalSource.register(controller);
feature.afterStart(controller);
}
/**
* Stops the host's threads.
*/
public void stop() {
feature.beforeStop(controller);
externalSource.unregister(controller);
feature.afterStop(controller);
}
/**
* Offers an event to the feature, before the policy controller handles it.
*
* @param protocol protocol
* @param topic2 topic
* @param event event
* @return {@code true} if the event was handled, {@code false} otherwise
*/
public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
return feature.beforeOffer(controller, protocol, topic2, event);
}
/**
* Offers an event to the feature, after the policy controller handles it.
*
* @param protocol protocol
* @param topic topic
* @param event event
* @param success success
* @return {@code true} if the event was handled, {@code false} otherwise
*/
public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
return feature.afterOffer(controller, protocol, topic, event, success);
}
/**
* Offers an event to the feature, before the drools controller handles it.
*
* @param fact fact
* @return {@code true} if the event was handled, {@code false} otherwise
*/
public boolean beforeInsert(Object fact) {
return feature.beforeInsert(drools, fact);
}
/**
* Offers an event to the feature, after the drools controller handles it.
*
* @param fact fact
* @param successInsert {@code true} if it was successfully inserted by the drools
* controller, {@code false} otherwise
* @return {@code true} if the event was handled, {@code false} otherwise
*/
public boolean afterInsert(Object fact, boolean successInsert) {
return feature.afterInsert(drools, fact, successInsert);
}
/**
* Indicates that a message was seen for this host.
*/
public void sawMessage() {
sawMsg.set(true);
}
/**
* Message seen.
*
* @return {@code true} if a message was seen for this host, {@code false} otherwise
*/
public boolean messageSeen() {
return sawMsg.get();
}
}
/**
* Listener for the external topic. Simulates the actions taken by
* AggregatedPolicyController.onTopicEvent.
*/
private static class MyExternalTopicListener implements Answer {
private final Context context;
private final Host host;
public MyExternalTopicListener(Context context, Host host) {
this.context = context;
this.host = host;
}
@Override
public Void answer(InvocationOnMock args) throws Throwable {
int index = 0;
CommInfrastructure commType = args.getArgument(index++);
String topic = args.getArgument(index++);
String event = args.getArgument(index++);
if (host.beforeOffer(commType, topic, event)) {
return null;
}
boolean result;
Object fact = decodeEvent(event);
if (fact == null) {
result = false;
context.bumpDecodeErrors();
} else {
result = true;
if (!host.beforeInsert(fact)) {
// feature did not handle it so we handle it here
host.afterInsert(fact, true);
host.sawMessage();
context.addEvent();
}
}
host.afterOffer(commType, topic, event, result);
return null;
}
}
/**
* Feature with overrides.
*/
private static class PoolingFeatureImpl extends PoolingFeature {
private final Context context;
private final Host host;
/**
* Constructor.
*
* @param context context
*/
public PoolingFeatureImpl(Context context, Host host) {
this.context = context;
this.host = host;
/*
* Note: do NOT extract anything from "context" at this point, because it hasn't been
* fully initialized yet
*/
}
@Override
public Properties getProperties(String featName) {
Properties props = new Properties();
props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
"" + STD_OFFLINE_PUB_WAIT_MS);
props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
"" + STD_START_HEARTBEAT_MS);
props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
"" + STD_ACTIVE_HEARTBEAT_MS);
props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
"" + STD_INTER_HEARTBEAT_MS);
props.putAll(makeSinkProperties(INTERNAL_TOPIC));
props.putAll(makeSourceProperties(INTERNAL_TOPIC));
return props;
}
@Override
public PolicyController getController(DroolsController droolsController) {
return context.getController(droolsController);
}
/**
* Embeds a specializer within a property name, after the prefix.
*
* @param propnm property name into which it should be embedded
* @param spec specializer to be embedded
* @return the property name, with the specializer embedded within it
*/
private String specialize(String propnm, String spec) {
String suffix = propnm.substring(PREFIX.length());
return PREFIX + spec + "." + suffix;
}
@Override
protected PoolingManagerImpl makeManager(String hostName, PolicyController controller, PoolingProperties props,
CountDownLatch activeLatch) {
/*
* Set this before creating the test, because the test's superclass
* constructor uses it before the test object has a chance to store it.
*/
currentHost.set(host);
return new PoolingManagerTest(hostName, controller, props, activeLatch);
}
}
/**
* Pooling Manager with overrides.
*/
private static class PoolingManagerTest extends PoolingManagerImpl {
/**
* Constructor.
*
* @param hostName the host
* @param controller the controller
* @param props the properties
* @param activeLatch the latch
*/
public PoolingManagerTest(String hostName, PolicyController controller,
PoolingProperties props, CountDownLatch activeLatch) {
super(hostName, controller, props, activeLatch);
}
@Override
protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
return new TopicMessageManagerImpl(topic);
}
@Override
protected boolean canDecodeEvent(DroolsController drools, String topic) {
return true;
}
@Override
protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
return decodeEvent(event);
}
}
/**
* DMaaP Manager with overrides.
*/
private static class TopicMessageManagerImpl extends TopicMessageManager {
/**
* Constructor.
*
* @param topic the topic
* @throws PoolingFeatureException if an error occurs
*/
public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
super(topic);
}
@Override
protected List getTopicSources() {
Host host = currentHost.get();
return Arrays.asList(host.internalSource, host.externalSource);
}
@Override
protected List getTopicSinks() {
return Arrays.asList(internalSink, externalSink);
}
}
/**
* Controller that also implements the {@link TopicListener} interface.
*/
private static interface ListenerController extends PolicyController, TopicListener {
}
}