summaryrefslogtreecommitdiffstats
path: root/feature-pooling-messages/src/test/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-messages/src/test/java/org')
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java810
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java1033
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java36
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureRtExceptionTest.java36
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java548
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java995
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java190
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java85
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/TopicMessageManagerTest.java322
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/feature-pooling-messages.properties47
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java361
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java59
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java74
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java73
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/MessageTest.java82
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java38
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java38
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/SupportBasicMessageTester.java251
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/SupportMessageWithAssignmentsTester.java103
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java470
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java98
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java121
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java396
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java444
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java184
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java466
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java282
27 files changed, 7642 insertions, 0 deletions
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java
new file mode 100644
index 00000000..31ad207c
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java
@@ -0,0 +1,810 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
+ * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
+ * following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd>
+ * </dl>
+ *
+ * <p>The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li>
+ * <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul>
+ */
+public class EndToEndFeatureTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class);
+
+ /**
+ * UEB servers for both internal & external topics.
+ */
+ private static final String UEB_SERVERS = "ueb-server";
+
+ /**
+ * Name of the topic used for inter-host communication.
+ */
+ private static final String INTERNAL_TOPIC = "internal-topic";
+
+ /**
+ * Name of the topic from which "external" events "arrive".
+ */
+ private static final String EXTERNAL_TOPIC = "external-topic";
+
+ /**
+ * Consumer group to use when polling the external topic.
+ */
+ private static final String EXTERNAL_GROUP = EndToEndFeatureTest.class.getName();
+
+ /**
+ * Name of the controller.
+ */
+ private static final String CONTROLLER1 = "controller.one";
+
+ /**
+ * Maximum number of items to fetch from DMaaP in a single poll.
+ */
+ private static final String FETCH_LIMIT = "5";
+
+ private static final long STD_REACTIVATE_WAIT_MS = 10000;
+ private static final long STD_IDENTIFICATION_MS = 10000;
+ private static final long STD_START_HEARTBEAT_MS = 15000;
+ private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
+ private static final long STD_INTER_HEARTBEAT_MS = 5000;
+ private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
+ private static final long EVENT_WAIT_SEC = 15;
+
+ /**
+ * Used to decode events from the external topic.
+ */
+ private static final Gson mapper = new Gson();
+
+ /**
+ * Used to identify the current host.
+ */
+ private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>();
+
+ /**
+ * Sink for external DMaaP topic.
+ */
+ private static TopicSink externalSink;
+
+ /**
+ * Sink for internal DMaaP topic.
+ */
+ private static TopicSink internalSink;
+
+ /**
+ * Context for the current test case.
+ */
+ private Context ctx;
+
+ /**
+ * Setup before class.
+ *
+ */
+ @BeforeAll
+ public static void setUpBeforeClass() {
+ externalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
+ externalSink.start();
+
+ internalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
+ internalSink.start();
+ }
+
+ /**
+ * Tear down after class.
+ *
+ */
+ @AfterAll
+ public static void tearDownAfterClass() {
+ externalSink.stop();
+ internalSink.stop();
+ }
+
+ /**
+ * Setup.
+ */
+ @BeforeEach
+ public void setUp() {
+ ctx = null;
+ }
+
+ /**
+ * Tear down.
+ */
+ @AfterEach
+ public void tearDown() {
+ if (ctx != null) {
+ ctx.destroy();
+ }
+ }
+
+ /*
+ * This test should only be run manually, after configuring all the fields,
+ * thus it is ignored.
+ */
+ @Disabled
+ @Test
+ public void test_SingleHost() throws Exception { // NOSONAR
+ run(70, 1);
+ }
+
+ /*
+ * This test should only be run manually, after configuring all the fields,
+ * thus it is ignored.
+ */
+ @Disabled
+ @Test
+ public void test_TwoHosts() throws Exception { // NOSONAR
+ run(200, 2);
+ }
+
+ /*
+ * This test should only be run manually, after configuring all the fields,
+ * thus it is ignored.
+ */
+ @Disabled
+ @Test
+ public void test_ThreeHosts() throws Exception { // NOSONAR
+ run(200, 3);
+ }
+
+ private void run(int nmessages, int nhosts) throws Exception {
+ ctx = new Context(nmessages);
+
+ for (int x = 0; x < nhosts; ++x) {
+ ctx.addHost();
+ }
+
+ ctx.startHosts();
+ ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
+
+ for (int x = 0; x < nmessages; ++x) {
+ ctx.offerExternal(makeMessage(x));
+ }
+
+ ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(0, ctx.getDecodeErrors());
+ assertEquals(0, ctx.getRemainingEvents());
+ ctx.checkAllSawAMsg();
+ }
+
+ private String makeMessage(int reqnum) {
+ return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
+ }
+
+ private static Properties makeSinkProperties(String topic) {
+ Properties props = new Properties();
+
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic);
+
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
+
+ return props;
+ }
+
+ private static Properties makeSourceProperties(String topic) {
+ Properties props = new Properties();
+
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
+
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
+
+ if (EXTERNAL_TOPIC.equals(topic)) {
+ // consumer group is a constant
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
+
+ // consumer instance is generated by the BusConsumer code
+ }
+
+ // else internal topic: feature populates info for internal topic
+
+ return props;
+ }
+
+ /**
+ * Decodes an event.
+ *
+ * @param event event
+ * @return the decoded event, or {@code null} if it cannot be decoded
+ */
+ private static Object decodeEvent(String event) {
+ try {
+ return mapper.fromJson(event, TreeMap.class);
+
+ } catch (JsonParseException e) {
+ logger.warn("cannot decode external event", e);
+ return null;
+ }
+ }
+
+ /**
+ * Context used for a single test case.
+ */
+ private static class Context {
+
+ /**
+ * Hosts that have been added to this context.
+ */
+ private final Deque<Host> hosts = new LinkedList<>();
+
+ /**
+ * Maps a drools controller to its policy controller.
+ */
+ private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
+
+ /**
+ * Counts the number of decode errors.
+ */
+ private final AtomicInteger decodeErrors = new AtomicInteger(0);
+
+ /**
+ * Number of events we're still waiting to receive.
+ */
+ private final CountDownLatch eventCounter;
+
+ /**
+ * Constructor.
+ *
+ * @param events number of events to be processed
+ */
+ public Context(int events) {
+ eventCounter = new CountDownLatch(events);
+ }
+
+ /**
+ * Destroys the context, stopping any hosts that remain.
+ */
+ public void destroy() {
+ stopHosts();
+ hosts.clear();
+ }
+
+ /**
+ * Creates and adds a new host to the context.
+ *
+ * @return the new Host
+ */
+ public Host addHost() {
+ Host host = new Host(this);
+ hosts.add(host);
+
+ return host;
+ }
+
+ /**
+ * Starts the hosts.
+ */
+ public void startHosts() {
+ hosts.forEach(host -> host.start());
+ }
+
+ /**
+ * Stops the hosts.
+ */
+ public void stopHosts() {
+ hosts.forEach(host -> host.stop());
+ }
+
+ /**
+ * Verifies that all hosts processed at least one message.
+ */
+ public void checkAllSawAMsg() {
+ int msgs = 0;
+ for (Host host : hosts) {
+ assertTrue(host.messageSeen(), "msgs=" + msgs);
+ ++msgs;
+ }
+ }
+
+ /**
+ * Offers an event to the external topic.
+ *
+ * @param event event
+ */
+ public void offerExternal(String event) {
+ externalSink.send(event);
+ }
+
+ /**
+ * Associates a controller with its drools controller.
+ *
+ * @param controller controller
+ * @param droolsController drools controller
+ */
+ public void addController(PolicyController controller, DroolsController droolsController) {
+ drools2policy.put(droolsController, controller);
+ }
+
+ /**
+ * Get controller.
+ *
+ * @param droolsController drools controller
+ * @return the controller associated with a drools controller, or {@code null} if it has no
+ * associated controller
+ */
+ public PolicyController getController(DroolsController droolsController) {
+ return drools2policy.get(droolsController);
+ }
+
+ /**
+ * Get decode errors.
+ *
+ * @return the number of decode errors so far
+ */
+ public int getDecodeErrors() {
+ return decodeErrors.get();
+ }
+
+ /**
+ * Increments the count of decode errors.
+ */
+ public void bumpDecodeErrors() {
+ decodeErrors.incrementAndGet();
+ }
+
+ /**
+ * Get remaining events.
+ *
+ * @return the number of events that haven't been processed
+ */
+ public long getRemainingEvents() {
+ return eventCounter.getCount();
+ }
+
+ /**
+ * Adds an event to the counter.
+ */
+ public void addEvent() {
+ eventCounter.countDown();
+ }
+
+ /**
+ * Waits, for a period of time, for all events to be processed.
+ *
+ * @param time time
+ * @param units units
+ * @return {@code true} if all events have been processed, {@code false} otherwise
+ * @throws InterruptedException throws interrupted exception
+ */
+ public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
+ return eventCounter.await(time, units);
+ }
+
+ /**
+ * Waits, for a period of time, for all hosts to enter the Active state.
+ *
+ * @param timeMs maximum time to wait, in milliseconds
+ * @throws InterruptedException throws interrupted exception
+ */
+ public void awaitAllActive(long timeMs) throws InterruptedException {
+ long tend = timeMs + System.currentTimeMillis();
+
+ for (Host host : hosts) {
+ long tremain = Math.max(0, tend - System.currentTimeMillis());
+ assertTrue(host.awaitActive(tremain));
+ }
+ }
+ }
+
+ /**
+ * Simulates a single "host".
+ */
+ private static class Host {
+
+ private final PoolingFeature feature;
+
+ /**
+ * {@code True} if this host has processed a message, {@code false} otherwise.
+ */
+ private final AtomicBoolean sawMsg = new AtomicBoolean(false);
+
+ private final TopicSource externalSource;
+ private final TopicSource internalSource;
+
+ // mock objects
+ private final PolicyEngine engine = mock(PolicyEngine.class);
+ private final ListenerController controller = mock(ListenerController.class);
+ private final DroolsController drools = mock(DroolsController.class);
+
+ /**
+ * Constructor.
+ *
+ * @param context context
+ */
+ public Host(Context context) {
+
+ when(controller.getName()).thenReturn(CONTROLLER1);
+ when(controller.getDrools()).thenReturn(drools);
+
+ externalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(EXTERNAL_TOPIC))
+ .get(0);
+ internalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(INTERNAL_TOPIC))
+ .get(0);
+
+ // stop consuming events if the controller stops
+ when(controller.stop()).thenAnswer(args -> {
+ externalSource.unregister(controller);
+ return true;
+ });
+
+ doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
+
+ context.addController(controller, drools);
+
+ feature = new PoolingFeatureImpl(context, this);
+ }
+
+ /**
+ * Waits, for a period of time, for the host to enter the Active state.
+ *
+ * @param timeMs time to wait, in milliseconds
+ * @return {@code true} if the host entered the Active state within the given amount of
+ * time, {@code false} otherwise
+ * @throws InterruptedException throws interrupted exception
+ */
+ public boolean awaitActive(long timeMs) throws InterruptedException {
+ return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Starts threads for the host so that it begins consuming from both the external "DMaaP"
+ * topic and its own internal "DMaaP" topic.
+ */
+ public void start() {
+ feature.beforeStart(engine);
+ feature.afterCreate(controller);
+
+ feature.beforeStart(controller);
+
+ // start consuming events from the external topic
+ externalSource.register(controller);
+
+ feature.afterStart(controller);
+ }
+
+ /**
+ * Stops the host's threads.
+ */
+ public void stop() {
+ feature.beforeStop(controller);
+ externalSource.unregister(controller);
+ feature.afterStop(controller);
+ }
+
+ /**
+ * Offers an event to the feature, before the policy controller handles it.
+ *
+ * @param protocol protocol
+ * @param topic2 topic
+ * @param event event
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ return feature.beforeOffer(controller, protocol, topic2, event);
+ }
+
+ /**
+ * Offers an event to the feature, after the policy controller handles it.
+ *
+ * @param protocol protocol
+ * @param topic topic
+ * @param event event
+ * @param success success
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
+
+ return feature.afterOffer(controller, protocol, topic, event, success);
+ }
+
+ /**
+ * Offers an event to the feature, before the drools controller handles it.
+ *
+ * @param fact fact
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean beforeInsert(Object fact) {
+ return feature.beforeInsert(drools, fact);
+ }
+
+ /**
+ * Offers an event to the feature, after the drools controller handles it.
+ *
+ * @param fact fact
+ * @param successInsert {@code true} if it was successfully inserted by the drools
+ * controller, {@code false} otherwise
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean afterInsert(Object fact, boolean successInsert) {
+ return feature.afterInsert(drools, fact, successInsert);
+ }
+
+ /**
+ * Indicates that a message was seen for this host.
+ */
+ public void sawMessage() {
+ sawMsg.set(true);
+ }
+
+ /**
+ * Message seen.
+ *
+ * @return {@code true} if a message was seen for this host, {@code false} otherwise
+ */
+ public boolean messageSeen() {
+ return sawMsg.get();
+ }
+ }
+
+ /**
+ * Listener for the external topic. Simulates the actions taken by
+ * <i>AggregatedPolicyController.onTopicEvent</i>.
+ */
+ private static class MyExternalTopicListener implements Answer<Void> {
+
+ private final Context context;
+ private final Host host;
+
+ public MyExternalTopicListener(Context context, Host host) {
+ this.context = context;
+ this.host = host;
+ }
+
+ @Override
+ public Void answer(InvocationOnMock args) throws Throwable {
+ int index = 0;
+ CommInfrastructure commType = args.getArgument(index++);
+ String topic = args.getArgument(index++);
+ String event = args.getArgument(index++);
+
+ if (host.beforeOffer(commType, topic, event)) {
+ return null;
+ }
+
+ boolean result;
+ Object fact = decodeEvent(event);
+
+ if (fact == null) {
+ result = false;
+ context.bumpDecodeErrors();
+
+ } else {
+ result = true;
+
+ if (!host.beforeInsert(fact)) {
+ // feature did not handle it so we handle it here
+ host.afterInsert(fact, result);
+
+ host.sawMessage();
+ context.addEvent();
+ }
+ }
+
+ host.afterOffer(commType, topic, event, result);
+ return null;
+ }
+ }
+
+ /**
+ * Feature with overrides.
+ */
+ private static class PoolingFeatureImpl extends PoolingFeature {
+
+ private final Context context;
+ private final Host host;
+
+ /**
+ * Constructor.
+ *
+ * @param context context
+ */
+ public PoolingFeatureImpl(Context context, Host host) {
+ this.context = context;
+ this.host = host;
+
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it hasn't been
+ * fully initialized yet
+ */
+ }
+
+ @Override
+ public Properties getProperties(String featName) {
+ Properties props = new Properties();
+
+ props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
+
+ props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
+ props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
+ props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
+ "" + STD_OFFLINE_PUB_WAIT_MS);
+ props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
+ "" + STD_START_HEARTBEAT_MS);
+ props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
+ props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
+ props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
+ "" + STD_ACTIVE_HEARTBEAT_MS);
+ props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
+ "" + STD_INTER_HEARTBEAT_MS);
+
+ props.putAll(makeSinkProperties(INTERNAL_TOPIC));
+ props.putAll(makeSourceProperties(INTERNAL_TOPIC));
+
+ return props;
+ }
+
+ @Override
+ public PolicyController getController(DroolsController droolsController) {
+ return context.getController(droolsController);
+ }
+
+ /**
+ * Embeds a specializer within a property name, after the prefix.
+ *
+ * @param propnm property name into which it should be embedded
+ * @param spec specializer to be embedded
+ * @return the property name, with the specializer embedded within it
+ */
+ private String specialize(String propnm, String spec) {
+ String suffix = propnm.substring(PREFIX.length());
+ return PREFIX + spec + "." + suffix;
+ }
+
+ @Override
+ protected PoolingManagerImpl makeManager(String hostName, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+
+ /*
+ * Set this before creating the test, because the test's superclass
+ * constructor uses it before the test object has a chance to store it.
+ */
+ currentHost.set(host);
+
+ return new PoolingManagerTest(hostName, controller, props, activeLatch);
+ }
+ }
+
+ /**
+ * Pooling Manager with overrides.
+ */
+ private static class PoolingManagerTest extends PoolingManagerImpl {
+
+ /**
+ * Constructor.
+ *
+ * @param hostName the host
+ * @param controller the controller
+ * @param props the properties
+ * @param activeLatch the latch
+ */
+ public PoolingManagerTest(String hostName, PolicyController controller,
+ PoolingProperties props, CountDownLatch activeLatch) {
+
+ super(hostName, controller, props, activeLatch);
+ }
+
+ @Override
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ return new TopicMessageManagerImpl(topic);
+ }
+
+ @Override
+ protected boolean canDecodeEvent(DroolsController drools, String topic) {
+ return true;
+ }
+
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
+ return decodeEvent(event);
+ }
+ }
+
+ /**
+ * DMaaP Manager with overrides.
+ */
+ private static class TopicMessageManagerImpl extends TopicMessageManager {
+
+ /**
+ * Constructor.
+ *
+ * @param topic the topic
+ * @throws PoolingFeatureException if an error occurs
+ */
+ public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
+ super(topic);
+ }
+
+ @Override
+ protected List<TopicSource> getTopicSources() {
+ Host host = currentHost.get();
+ return Arrays.asList(host.internalSource, host.externalSource);
+ }
+
+ @Override
+ protected List<TopicSink> getTopicSinks() {
+ return Arrays.asList(internalSink, externalSink);
+ }
+ }
+
+ /**
+ * Controller that also implements the {@link TopicListener} interface.
+ */
+ private static interface ListenerController extends PolicyController, TopicListener {
+
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
new file mode 100644
index 00000000..e9bd3cb5
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -0,0 +1,1033 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.common.endpoints.event.comm.Topic;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
+ * its own feature object. Uses real feature objects. However, the following are not:
+ * <dl>
+ * <dt>DMaaP sources and sinks</dt>
+ * <dd>simulated using queues. There is one queue for the external topic, and one queue
+ * for each host's internal topic. Messages published to the "admin" channel are simply
+ * sent to all of the hosts' internal topic queues</dd>
+ * <dt>PolicyEngine, PolicyController, DroolsController</dt>
+ * <dd>mocked</dd>
+ * </dl>
+ *
+ * <p>Invoke {@link #runSlow()}, before the test, to slow things down.
+ */
+
+class FeatureTest {
+ private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
+ /**
+ * Name of the topic used for inter-host communication.
+ */
+ private static final String INTERNAL_TOPIC = "my.internal.topic";
+ /**
+ * Name of the topic from which "external" events "arrive".
+ */
+ private static final String EXTERNAL_TOPIC = "my.external.topic";
+ /**
+ * Name of the controller.
+ */
+ private static final String CONTROLLER1 = "controller.one";
+ private static long stdReactivateWaitMs = 200;
+ private static long stdIdentificationMs = 60;
+ private static long stdStartHeartbeatMs = 60;
+ private static long stdActiveHeartbeatMs = 50;
+ private static long stdInterHeartbeatMs = 5;
+ private static long stdOfflinePubWaitMs = 2;
+ private static long stdPollMs = 2;
+ private static long stdInterPollMs = 2;
+ private static long stdEventWaitSec = 10;
+ /**
+ * Used to decode events from the external topic.
+ */
+ private static final Gson mapper = new Gson();
+ /**
+ * Used to identify the current context.
+ */
+ private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
+ /**
+ * Context for the current test case.
+ */
+ private Context ctx;
+
+ /**
+ * Setup.
+ */
+
+ @BeforeEach
+ public void setUp() {
+ ctx = null;
+ }
+
+ /**
+ * Tear down.
+ */
+
+ @AfterEach
+ public void tearDown() {
+ if (ctx != null) {
+ ctx.destroy();
+ }
+ }
+
+ @Test
+ void test_SingleHost() throws Exception {
+ run(70, 1);
+ }
+
+ @Test
+ void test_TwoHosts() throws Exception {
+ run(200, 2);
+ }
+
+ @Test
+ void test_ThreeHosts() throws Exception {
+ run(200, 3);
+ }
+
+ private void run(int nmessages, int nhosts) throws Exception {
+ ctx = new Context(nmessages);
+ for (int x = 0; x < nhosts; ++x) {
+ ctx.addHost();
+ }
+ ctx.startHosts();
+ for (int x = 0; x < nmessages; ++x) {
+ ctx.offerExternal(makeMessage(x));
+ }
+ ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
+ assertEquals(0, ctx.getDecodeErrors());
+ assertEquals(0, ctx.getRemainingEvents());
+ ctx.checkAllSawAMsg();
+ }
+
+ private String makeMessage(int reqnum) {
+ return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
+ }
+
+ /**
+ * Invoke this to slow the timers down.
+ */
+
+ protected static void runSlow() {
+ stdReactivateWaitMs = 10000;
+ stdIdentificationMs = 10000;
+ stdStartHeartbeatMs = 15000;
+ stdActiveHeartbeatMs = 12000;
+ stdInterHeartbeatMs = 5000;
+ stdOfflinePubWaitMs = 2;
+ stdPollMs = 2;
+ stdInterPollMs = 2000;
+ stdEventWaitSec = 1000;
+ }
+
+ /**
+ * Decodes an event.
+ *
+ * @param event event
+ * @return the decoded event, or {@code null} if it cannot be decoded
+ */
+
+ private static Object decodeEvent(String event) {
+ try {
+ return mapper.fromJson(event, TreeMap.class);
+ } catch (JsonParseException e) {
+ logger.warn("cannot decode external event", e);
+ return null;
+ }
+ }
+
+ /**
+ * Context used for a single test case.
+ */
+
+ private static class Context {
+ /**
+ * Hosts that have been added to this context.
+ */
+ private final Deque<Host> hosts = new LinkedList<>();
+ /**
+ * Maps a drools controller to its policy controller.
+ */
+ private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
+ /**
+ * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
+ */
+ private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
+ /**
+ * Counts the number of decode errors.
+ */
+ private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
+ /**
+ * Number of events we're still waiting to receive.
+ */
+ private final CountDownLatch eventCounter;
+
+ /**
+ * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
+ * {@link #getCurrentHost()}.
+ */
+ @Getter
+ private Host currentHost = null;
+
+ /**
+ * Constructor.
+ *
+ * @param events number of events to be processed
+ */
+
+ public Context(int events) {
+ eventCounter = new CountDownLatch(events);
+ }
+
+ /**
+ * Destroys the context, stopping any hosts that remain.
+ */
+
+ public void destroy() {
+ stopHosts();
+ hosts.clear();
+ }
+
+ /**
+ * Creates and adds a new host to the context.
+ *
+ * @return the new Host
+ */
+
+ public Host addHost() {
+ Host host = new Host(this);
+ hosts.add(host);
+ return host;
+ }
+
+ /**
+ * Starts the hosts.
+ */
+
+ public void startHosts() {
+ hosts.forEach(host -> host.start());
+ }
+
+ /**
+ * Stops the hosts.
+ */
+
+ public void stopHosts() {
+ hosts.forEach(host -> host.stop());
+ }
+
+ /**
+ * Verifies that all hosts processed at least one message.
+ */
+
+ public void checkAllSawAMsg() {
+ int msgs = 0;
+ for (Host host : hosts) {
+ assertTrue(host.messageSeen(), "msgs=" + msgs);
+ ++msgs;
+ }
+ }
+
+ /**
+ * Sets {@link #currentHost} to the specified host, and then invokes the given
+ * function. Resets {@link #currentHost} to {@code null} before returning.
+ *
+ * @param host host
+ * @param func function to invoke
+ */
+
+ public void withHost(Host host, VoidFunction func) {
+ currentHost = host;
+ func.apply();
+ currentHost = null;
+ }
+
+ /**
+ * Offers an event to the external topic. As each host needs a copy, it is posted
+ * to each Host's queue.
+ *
+ * @param event event
+ */
+
+ public void offerExternal(String event) {
+ for (Host host : hosts) {
+ host.getExternalTopic().offer(event);
+ }
+ }
+
+ /**
+ * Adds an internal channel to the set of channels.
+ *
+ * @param channel channel
+ * @param queue the channel's queue
+ */
+
+ public void addInternal(String channel, BlockingQueue<String> queue) {
+ channel2queue.put(channel, queue);
+ }
+
+ /**
+ * Offers a message to all internal channels.
+ *
+ * @param message message
+ */
+
+ public void offerInternal(String message) {
+ channel2queue.values().forEach(queue -> queue.offer(message));
+ }
+
+ /**
+ * Associates a controller with its drools controller.
+ *
+ * @param controller controller
+ * @param droolsController drools controller
+ */
+
+ public void addController(PolicyController controller, DroolsController droolsController) {
+ drools2policy.put(droolsController, controller);
+ }
+
+ /**
+ * Get controller.
+ *
+ * @param droolsController drools controller
+ * @return the controller associated with a drools controller, or {@code null} if
+ * it has no associated controller
+ */
+
+ public PolicyController getController(DroolsController droolsController) {
+ return drools2policy.get(droolsController);
+ }
+
+ /**
+ * Get decode errors.
+ *
+ * @return the number of decode errors so far
+ */
+
+ public int getDecodeErrors() {
+ return numDecodeErrors.get();
+ }
+
+ /**
+ * Increments the count of decode errors.
+ */
+
+ public void bumpDecodeErrors() {
+ numDecodeErrors.incrementAndGet();
+ }
+
+ /**
+ * Get remaining events.
+ *
+ * @return the number of events that haven't been processed
+ */
+
+ public long getRemainingEvents() {
+ return eventCounter.getCount();
+ }
+
+ /**
+ * Adds an event to the counter.
+ */
+
+ public void addEvent() {
+ eventCounter.countDown();
+ }
+
+ /**
+ * Waits, for a period of time, for all events to be processed.
+ *
+ * @param time time
+ * @param units units
+ * @return {@code true} if all events have been processed, {@code false} otherwise
+ * @throws InterruptedException throws interrupted
+ */
+
+ public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
+ return eventCounter.await(time, units);
+ }
+
+ }
+
+ /**
+ * Simulates a single "host".
+ */
+
+ private static class Host {
+ private final Context context;
+ private final PoolingFeature feature;
+
+ /**
+ * {@code True} if this host has processed a message, {@code false} otherwise.
+ */
+
+ private final AtomicBoolean sawMsg = new AtomicBoolean(false);
+
+ /**
+ * This host's internal "DMaaP" topic.
+ */
+
+ private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
+
+ /**
+ * Queue for the external "DMaaP" topic.
+ */
+ @Getter
+ private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
+
+ /**
+ * Source that reads from the external topic and posts to the listener.
+ */
+
+ private TopicSource externalSource;
+
+ // mock objects
+ private final PolicyEngine engine = mock(PolicyEngine.class);
+ private final ListenerController controller = mock(ListenerController.class);
+ private final DroolsController drools = mock(DroolsController.class);
+
+ /**
+ * Constructor.
+ *
+ * @param context context
+ */
+
+ public Host(Context context) {
+ this.context = context;
+ when(controller.getName()).thenReturn(CONTROLLER1);
+ when(controller.getDrools()).thenReturn(drools);
+ // stop consuming events if the controller stops
+ when(controller.stop()).thenAnswer(args -> {
+ externalSource.unregister(controller);
+ return true;
+ });
+ doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
+ context.addController(controller, drools);
+ // arrange to read from the external topic
+ externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
+ feature = new PoolingFeatureImpl(context);
+ }
+
+ /**
+ * Get name.
+ *
+ * @return the host name
+ */
+
+ public String getName() {
+ return feature.getHost();
+ }
+
+ /**
+ * Starts threads for the host so that it begins consuming from both the external
+ * "DMaaP" topic and its own internal "DMaaP" topic.
+ */
+
+ public void start() {
+ context.withHost(this, () -> {
+ feature.beforeStart(engine);
+ feature.afterCreate(controller);
+ // assign the queue for this host's internal topic
+ context.addInternal(getName(), msgQueue);
+ feature.beforeStart(controller);
+ // start consuming events from the external topic
+ externalSource.register(controller);
+ feature.afterStart(controller);
+ });
+ }
+
+ /**
+ * Stops the host's threads.
+ */
+
+ public void stop() {
+ feature.beforeStop(controller);
+ externalSource.unregister(controller);
+ feature.afterStop(controller);
+ }
+
+ /**
+ * Offers an event to the feature, before the policy controller handles it.
+ *
+ * @param protocol protocol
+ * @param topic2 topic
+ * @param event event
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+
+ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ return feature.beforeOffer(controller, protocol, topic2, event);
+ }
+
+ /**
+ * Offers an event to the feature, after the policy controller handles it.
+ *
+ * @param protocol protocol
+ * @param topic topic
+ * @param event event
+ * @param success success
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+
+ public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
+ return feature.afterOffer(controller, protocol, topic, event, success);
+ }
+
+ /**
+ * Offers an event to the feature, before the drools controller handles it.
+ *
+ * @param fact fact
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+
+ public boolean beforeInsert(Object fact) {
+ return feature.beforeInsert(drools, fact);
+ }
+
+ /**
+ * Offers an event to the feature, after the drools controller handles it.
+ *
+ * @param fact fact
+ * @param successInsert {@code true} if it was successfully inserted by the drools
+ * controller, {@code false} otherwise
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+
+ public boolean afterInsert(Object fact, boolean successInsert) {
+ return feature.afterInsert(drools, fact, successInsert);
+ }
+
+ /**
+ * Indicates that a message was seen for this host.
+ */
+
+ public void sawMessage() {
+ sawMsg.set(true);
+ }
+
+ /**
+ * Message seen.
+ *
+ * @return {@code true} if a message was seen for this host, {@code false} otherwise
+ */
+
+ public boolean messageSeen() {
+ return sawMsg.get();
+ }
+
+ /**
+ * Get internal queue.
+ *
+ * @return the queue associated with this host's internal topic
+ */
+
+ public BlockingQueue<String> getInternalQueue() {
+ return msgQueue;
+ }
+ }
+
+ /**
+ * Listener for the external topic. Simulates the actions taken by
+ * <i>AggregatedPolicyController.onTopicEvent</i>.
+ */
+
+ private static class MyExternalTopicListener implements Answer<Void> {
+ private final Context context;
+ private final Host host;
+
+ public MyExternalTopicListener(Context context, Host host) {
+ this.context = context;
+ this.host = host;
+ }
+
+ @Override
+ public Void answer(InvocationOnMock args) throws Throwable {
+ int index = 0;
+ CommInfrastructure commType = args.getArgument(index++);
+ String topic = args.getArgument(index++);
+ String event = args.getArgument(index++);
+ if (host.beforeOffer(commType, topic, event)) {
+ return null;
+ }
+ boolean result;
+ Object fact = decodeEvent(event);
+ if (fact == null) {
+ result = false;
+ context.bumpDecodeErrors();
+ } else {
+ result = true;
+ if (!host.beforeInsert(fact)) {
+ // feature did not handle it so we handle it here
+ host.afterInsert(fact, result);
+ host.sawMessage();
+ context.addEvent();
+ }
+ }
+ host.afterOffer(commType, topic, event, result);
+ return null;
+ }
+ }
+
+ /**
+ * Sink implementation that puts a message on the queue specified by the
+ * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
+ * message is placed on all queues.
+ */
+
+ private static class TopicSinkImpl extends TopicImpl implements TopicSink {
+ private final Context context;
+
+ /**
+ * Constructor.
+ *
+ * @param context context
+ */
+
+ public TopicSinkImpl(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public synchronized boolean send(String message) {
+ if (!isAlive()) {
+ return false;
+ }
+ try {
+ context.offerInternal(message);
+ return true;
+ } catch (JsonParseException e) {
+ logger.warn("could not decode message: {}", message);
+ context.bumpDecodeErrors();
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Source implementation that reads from a queue associated with a topic.
+ */
+
+ private static class TopicSourceImpl extends TopicImpl implements TopicSource {
+
+ private final String topic;
+ /**
+ * Queue from which to retrieve messages.
+ */
+ private final BlockingQueue<String> queue;
+ /**
+ * Manages the current consumer thread. The "first" item is used as a trigger to
+ * tell the thread to stop processing, while the "second" item is triggered <i>by
+ * the thread</i> when it completes.
+ */
+ private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
+
+ /**
+ * Constructor.
+ *
+ * @param type topic type
+ * @param queue topic from which to read
+ */
+
+ public TopicSourceImpl(String type, BlockingQueue<String> queue) {
+ this.topic = type;
+ this.queue = queue;
+ }
+
+ @Override
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public boolean offer(String event) {
+ throw new UnsupportedOperationException("offer topic source");
+ }
+
+ /**
+ * Starts a thread that takes messages from the queue and gives them to the
+ * listener. Stops the thread of any previously registered listener.
+ */
+
+ @Override
+ public void register(TopicListener listener) {
+ Pair<CountDownLatch, CountDownLatch> newPair = Pair.of(new CountDownLatch(1), new CountDownLatch(1));
+ reregister(newPair);
+ Thread thread = new Thread(() -> {
+ try {
+ do {
+ processMessages(newPair.getLeft(), listener);
+ } while (!newPair.getLeft().await(stdInterPollMs, TimeUnit.MILLISECONDS));
+ logger.info("topic source thread completed");
+ } catch (InterruptedException e) {
+ logger.warn("topic source thread aborted", e);
+ Thread.currentThread().interrupt();
+ } catch (RuntimeException e) {
+ logger.warn("topic source thread aborted", e);
+ }
+ newPair.getRight().countDown();
+ });
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Stops the thread of <i>any</i> currently registered listener.
+ */
+
+ @Override
+ public void unregister(TopicListener listener) {
+ reregister(null);
+ }
+
+ /**
+ * Registers a new "pair" with this source, stopping the consumer associated with
+ * any previous registration.
+ *
+ * @param newPair the new "pair", or {@code null} to unregister
+ */
+
+ private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
+ try {
+ Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
+ if (oldPair == null) {
+ if (newPair == null) {
+ // unregister was invoked twice in a row
+ logger.warn("re-unregister for topic source");
+ }
+ // no previous thread to stop
+ return;
+ }
+ // need to stop the previous thread
+ // tell it to stop
+ oldPair.getLeft().countDown();
+ // wait for it to stop
+ if (!oldPair.getRight().await(2, TimeUnit.SECONDS)) {
+ logger.warn("old topic registration is still running");
+ }
+ } catch (InterruptedException e) {
+ logger.warn("old topic registration may still be running", e);
+ Thread.currentThread().interrupt();
+ }
+ if (newPair != null) {
+ // register was invoked twice in a row
+ logger.warn("re-register for topic source");
+ }
+ }
+
+ /**
+ * Polls for messages from the topic and offers them to the listener.
+ *
+ * @param stopped triggered if processing should stop
+ * @param listener listener
+ * @throws InterruptedException throws interrupted exception
+ */
+
+ private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
+ for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
+ String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
+ if (msg == null) {
+ return;
+ }
+ listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
+ }
+ }
+ }
+
+ /**
+ * Topic implementation. Most methods just throw
+ * {@link UnsupportedOperationException}.
+ */
+
+ private static class TopicImpl implements Topic {
+
+ /**
+ * Constructor.
+ */
+
+ public TopicImpl() {
+ super();
+ }
+
+ @Override
+ public String getTopic() {
+ return INTERNAL_TOPIC;
+ }
+
+ @Override
+ public String getEffectiveTopic() {
+ return INTERNAL_TOPIC;
+ }
+
+ @Override
+ public CommInfrastructure getTopicCommInfrastructure() {
+ throw new UnsupportedOperationException("topic protocol");
+ }
+
+ @Override
+ public List<String> getServers() {
+ throw new UnsupportedOperationException("topic servers");
+ }
+
+ @Override
+ public String[] getRecentEvents() {
+ throw new UnsupportedOperationException("topic events");
+ }
+
+ @Override
+ public void register(TopicListener topicListener) {
+ throw new UnsupportedOperationException("register topic");
+ }
+
+ @Override
+ public void unregister(TopicListener topicListener) {
+ throw new UnsupportedOperationException("unregister topic");
+ }
+
+ @Override
+ public synchronized boolean start() {
+ return true;
+ }
+
+ @Override
+ public synchronized boolean stop() {
+ return true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ // do nothing
+ }
+
+ @Override
+ public synchronized boolean isAlive() {
+ return true;
+ }
+
+ @Override
+ public boolean lock() {
+ throw new UnsupportedOperationException("lock topicink");
+ }
+
+ @Override
+ public boolean unlock() {
+ throw new UnsupportedOperationException("unlock topic");
+ }
+
+ @Override
+ public boolean isLocked() {
+ throw new UnsupportedOperationException("topic isLocked");
+ }
+ }
+
+ /**
+ * Feature with overrides.
+ */
+
+ private static class PoolingFeatureImpl extends PoolingFeature {
+ private final Context context;
+
+ /**
+ * Constructor.
+ *
+ * @param context context
+ */
+
+ public PoolingFeatureImpl(Context context) {
+ this.context = context;
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it
+ * hasn't been fully initialized yet
+ */
+ }
+
+ @Override
+ public Properties getProperties(String featName) {
+ Properties props = new Properties();
+ props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
+ props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
+ props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
+ props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
+ props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1), "" + stdOfflinePubWaitMs);
+ props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1), "" + stdStartHeartbeatMs);
+ props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
+ props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
+ props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
+ "" + stdActiveHeartbeatMs);
+ props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), "" + stdInterHeartbeatMs);
+ return props;
+ }
+
+ @Override
+ public PolicyController getController(DroolsController droolsController) {
+ return context.getController(droolsController);
+ }
+
+ /**
+ * Embeds a specializer within a property name, after the prefix.
+ *
+ * @param propnm property name into which it should be embedded
+ * @param spec specializer to be embedded
+ * @return the property name, with the specializer embedded within it
+ */
+
+ private String specialize(String propnm, String spec) {
+ String suffix = propnm.substring(PREFIX.length());
+ return PREFIX + spec + "." + suffix;
+ }
+
+ @Override
+ protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+ currentContext.set(context);
+ return new PoolingManagerTest(host, controller, props, activeLatch);
+ }
+ }
+
+ /**
+ * Pooling Manager with overrides.
+ */
+
+ private static class PoolingManagerTest extends PoolingManagerImpl {
+
+ /**
+ * Constructor.
+ *
+ * @param host the host
+ * @param controller the controller
+ * @param props the properties
+ * @param activeLatch the latch
+ */
+
+ public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+ super(host, controller, props, activeLatch);
+ }
+
+ @Override
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ return new TopicMessageManagerImpl(topic);
+ }
+
+ @Override
+ protected boolean canDecodeEvent(DroolsController drools, String topic) {
+ return true;
+ }
+
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
+ return decodeEvent(event);
+ }
+ }
+
+ /**
+ * DMaaP Manager with overrides.
+ */
+
+ private static class TopicMessageManagerImpl extends TopicMessageManager {
+
+ /**
+ * Constructor.
+ *
+ * @param topic the topic
+ * @throws PoolingFeatureException if an error occurs
+ */
+
+ public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
+ super(topic);
+ }
+
+ @Override
+ protected List<TopicSource> getTopicSources() {
+ return List.of(
+ new TopicSourceImpl(INTERNAL_TOPIC, currentContext.get().getCurrentHost().getInternalQueue()));
+ }
+
+ @Override
+ protected List<TopicSink> getTopicSinks() {
+ return List.of(new TopicSinkImpl(currentContext.get()));
+ }
+ }
+
+ /**
+ * Controller that also implements the {@link TopicListener} interface.
+ */
+
+ private static interface ListenerController extends PolicyController, TopicListener {
+ }
+
+ /**
+ * Simple function that takes no arguments and returns nothing.
+ */
+
+ @FunctionalInterface
+ private static interface VoidFunction {
+ void apply();
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java
new file mode 100644
index 00000000..1bfba19c
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+
+class PoolingFeatureExceptionTest extends ExceptionsTester {
+
+ @Test
+ void test() {
+ assertEquals(5, test(PoolingFeatureException.class));
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureRtExceptionTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureRtExceptionTest.java
new file mode 100644
index 00000000..a4305a7f
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureRtExceptionTest.java
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+
+class PoolingFeatureRtExceptionTest extends ExceptionsTester {
+
+ @Test
+ void test() {
+ assertEquals(5, test(PoolingFeatureRtException.class));
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
new file mode 100644
index 00000000..1b05e021
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -0,0 +1,548 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngine;
+
+class PoolingFeatureTest {
+
+ private static final String CONTROLLER1 = "controllerA";
+ private static final String CONTROLLER2 = "controllerB";
+ private static final String CONTROLLER_DISABLED = "controllerDisabled";
+ private static final String CONTROLLER_EX = "controllerException";
+ private static final String CONTROLLER_UNKNOWN = "controllerUnknown";
+
+ private static final String TOPIC1 = "topic.one";
+ private static final String TOPIC2 = "topic.two";
+
+ private static final String EVENT1 = "event.one";
+ private static final String EVENT2 = "event.two";
+
+ private static final Object OBJECT1 = new Object();
+ private static final Object OBJECT2 = new Object();
+
+ private Properties props;
+ private PolicyEngine engine;
+ private PolicyController controller1;
+ private PolicyController controller2;
+ private PolicyController controllerDisabled;
+ private PolicyController controllerException;
+ private PolicyController controllerUnknown;
+ private DroolsController drools1;
+ private DroolsController drools2;
+ private DroolsController droolsDisabled;
+ private List<Pair<PoolingManagerImpl, PoolingProperties>> managers;
+ private PoolingManagerImpl mgr1;
+ private PoolingManagerImpl mgr2;
+
+ private PoolingFeature pool;
+
+ /**
+ * Setup.
+ *
+ * @throws Exception exception
+ */
+ @BeforeEach
+ public void setUp() throws Exception {
+ props = initProperties();
+ engine = mock(PolicyEngine.class);
+ controller1 = mock(PolicyController.class);
+ controller2 = mock(PolicyController.class);
+ controllerDisabled = mock(PolicyController.class);
+ controllerException = mock(PolicyController.class);
+ controllerUnknown = mock(PolicyController.class);
+ drools1 = mock(DroolsController.class);
+ drools2 = mock(DroolsController.class);
+ droolsDisabled = mock(DroolsController.class);
+ managers = new LinkedList<>();
+
+ when(controller1.getName()).thenReturn(CONTROLLER1);
+ when(controller2.getName()).thenReturn(CONTROLLER2);
+ when(controllerDisabled.getName()).thenReturn(CONTROLLER_DISABLED);
+ when(controllerException.getName()).thenReturn(CONTROLLER_EX);
+ when(controllerUnknown.getName()).thenReturn(CONTROLLER_UNKNOWN);
+
+ pool = new PoolingFeatureImpl();
+
+ pool.beforeStart(engine);
+
+ pool.afterCreate(controller1);
+ pool.afterCreate(controller2);
+
+ mgr1 = managers.get(0).getLeft();
+ mgr2 = managers.get(1).getLeft();
+ }
+
+ @Test
+ void test() {
+ assertEquals(2, managers.size());
+ }
+
+ @Test
+ void testGetHost() {
+ String host = pool.getHost();
+ assertNotNull(host);
+
+ // create another and ensure it generates another host name
+ pool = new PoolingFeatureImpl();
+ String host2 = pool.getHost();
+ assertNotNull(host2);
+
+ assertNotEquals(host, host2);
+ }
+
+ @Test
+ void testGetSequenceNumber() {
+ assertEquals(0, pool.getSequenceNumber());
+ }
+
+ @Test
+ void testBeforeStartEngine() {
+ pool = new PoolingFeatureImpl();
+
+ assertFalse(pool.beforeStart(engine));
+ }
+
+ @Test
+ void testAfterCreate() {
+ managers.clear();
+ pool = new PoolingFeatureImpl();
+ pool.beforeStart(engine);
+
+ assertFalse(pool.afterCreate(controller1));
+ assertEquals(1, managers.size());
+
+ // duplicate
+ assertFalse(pool.afterCreate(controller1));
+ assertEquals(1, managers.size());
+
+ // second controller
+ assertFalse(pool.afterCreate(controller2));
+ assertEquals(2, managers.size());
+ }
+
+ @Test
+ void testAfterCreate_NotEnabled() {
+ managers.clear();
+ pool = new PoolingFeatureImpl();
+ pool.beforeStart(engine);
+
+ assertFalse(pool.afterCreate(controllerDisabled));
+ assertTrue(managers.isEmpty());
+ }
+
+ @Test
+ void testAfterCreate_PropertyEx() {
+ managers.clear();
+ pool = new PoolingFeatureImpl();
+ pool.beforeStart(engine);
+
+ assertThrows(PoolingFeatureRtException.class, () -> pool.afterCreate(controllerException));
+ }
+
+ @Test
+ void testAfterCreate_NoProps() {
+ pool = new PoolingFeatureImpl();
+
+ // did not perform globalInit, which is an error
+
+ assertThrows(PoolingFeatureRtException.class, () -> pool.afterCreate(controller1));
+ }
+
+ @Test
+ void testAfterCreate_NoFeatProps() {
+ managers.clear();
+ pool = new PoolingFeatureImpl();
+ pool.beforeStart(engine);
+
+ assertFalse(pool.afterCreate(controllerUnknown));
+ assertTrue(managers.isEmpty());
+ }
+
+ @Test
+ void testBeforeStart() throws Exception {
+ assertFalse(pool.beforeStart(controller1));
+ verify(mgr1).beforeStart();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeStart(controller1));
+ verify(mgr1, times(2)).beforeStart();
+
+ assertFalse(pool.beforeStart(controllerDisabled));
+ }
+
+ @Test
+ void testAfterStart() {
+ assertFalse(pool.afterStart(controller1));
+ verify(mgr1).afterStart();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.afterStart(controller1));
+ verify(mgr1, times(2)).afterStart();
+
+ assertFalse(pool.afterStart(controllerDisabled));
+ }
+
+ @Test
+ void testBeforeStop() {
+ assertFalse(pool.beforeStop(controller1));
+ verify(mgr1).beforeStop();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeStop(controller1));
+ verify(mgr1, times(2)).beforeStop();
+
+ assertFalse(pool.beforeStop(controllerDisabled));
+ }
+
+ @Test
+ void testAfterStop() {
+ assertFalse(pool.afterStop(controller1));
+ verify(mgr1).afterStop();
+
+ assertFalse(pool.afterStop(controllerDisabled));
+
+ // count should be unchanged
+ verify(mgr1).afterStop();
+ }
+
+ @Test
+ void testAfterHalt() {
+ assertFalse(pool.afterHalt(controller1));
+ assertFalse(pool.afterHalt(controller1));
+
+ verify(mgr1, never()).afterStop();
+
+ assertFalse(pool.afterStop(controllerDisabled));
+ }
+
+ @Test
+ void testAfterShutdown() {
+ assertFalse(pool.afterShutdown(controller1));
+ assertFalse(pool.afterShutdown(controller1));
+
+ verify(mgr1, never()).afterStop();
+
+ assertFalse(pool.afterStop(controllerDisabled));
+ }
+
+ @Test
+ void testBeforeLock() {
+ assertFalse(pool.beforeLock(controller1));
+ verify(mgr1).beforeLock();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeLock(controller1));
+ verify(mgr1, times(2)).beforeLock();
+
+ assertFalse(pool.beforeLock(controllerDisabled));
+ }
+
+ @Test
+ void testAfterUnlock() {
+ assertFalse(pool.afterUnlock(controller1));
+ verify(mgr1).afterUnlock();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.afterUnlock(controller1));
+ verify(mgr1, times(2)).afterUnlock();
+
+ assertFalse(pool.afterUnlock(controllerDisabled));
+ }
+
+ @Test
+ void testBeforeOffer() {
+ assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ verify(mgr1).beforeOffer(TOPIC1, EVENT1);
+
+ // ensure that the args were captured
+ pool.beforeInsert(drools1, OBJECT1);
+ verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
+
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
+ verify(mgr1).beforeOffer(TOPIC2, EVENT2);
+
+ // ensure that the new args were captured
+ pool.beforeInsert(drools1, OBJECT2);
+ verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
+
+
+ assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ }
+
+ @Test
+ void testBeforeOffer_NotFound() {
+ assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ }
+
+ @Test
+ void testBeforeOffer_MgrTrue() {
+
+ // manager will return true
+ when(mgr1.beforeOffer(any(), any())).thenReturn(true);
+
+ assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ verify(mgr1).beforeOffer(TOPIC1, EVENT1);
+
+ // ensure it's still in the map by re-invoking
+ assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
+ verify(mgr1).beforeOffer(TOPIC2, EVENT2);
+
+ assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ }
+
+ @Test
+ void testBeforeInsert() {
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
+
+ // ensure it's still in the map by re-invoking
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2);
+ assertFalse(pool.beforeInsert(drools1, OBJECT2));
+ verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
+
+ pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
+ assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
+ }
+
+ @Test
+ void testBeforeInsert_NoArgs() {
+
+ // call beforeInsert without beforeOffer
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any());
+
+ assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any());
+ }
+
+ @Test
+ void testBeforeInsert_ArgEx() {
+ // generate exception
+ pool = new PoolingFeatureImpl() {
+ @Override
+ protected PolicyController getController(DroolsController droolsController) {
+ throw new IllegalArgumentException();
+ }
+ };
+
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any());
+ }
+
+ @Test
+ void testBeforeInsert_StateEx() {
+ // generate exception
+ pool = new PoolingFeatureImpl() {
+ @Override
+ protected PolicyController getController(DroolsController droolsController) {
+ throw new IllegalStateException();
+ }
+ };
+
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any());
+ }
+
+ @Test
+ void testBeforeInsert_NullController() {
+
+ // return null controller
+ pool = new PoolingFeatureImpl() {
+ @Override
+ protected PolicyController getController(DroolsController droolsController) {
+ return null;
+ }
+ };
+
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any());
+ }
+
+ @Test
+ void testBeforeInsert_NotFound() {
+
+ pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
+ assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
+ }
+
+ @Test
+ void testAfterOffer() {
+ // this will create OfferArgs
+ pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+
+ // this should clear them
+ assertFalse(pool.afterOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2, true));
+
+ assertFalse(pool.beforeInsert(drools1, OBJECT1));
+ verify(mgr1, never()).beforeInsert(any(), any());
+
+
+ assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
+ }
+
+ @Test
+ void testDoManager() {
+ assertFalse(pool.beforeStart(controller1));
+ verify(mgr1).beforeStart();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeStart(controller1));
+ verify(mgr1, times(2)).beforeStart();
+
+
+ // different controller
+ assertFalse(pool.beforeStart(controller2));
+ verify(mgr2).beforeStart();
+
+ // ensure it's still in the map by re-invoking
+ assertFalse(pool.beforeStart(controller2));
+ verify(mgr2, times(2)).beforeStart();
+
+
+ assertFalse(pool.beforeStart(controllerDisabled));
+ }
+
+ @Test
+ void testDoManager_NotFound() {
+ assertFalse(pool.beforeStart(controllerDisabled));
+ }
+
+ @Test
+ void testDoManager_Ex() {
+
+ // generate exception
+ doThrow(new RuntimeException()).when(mgr1).beforeStart();
+
+ assertThrows(RuntimeException.class, () -> pool.beforeStart(controller1));
+ }
+
+ private Properties initProperties() {
+ Properties props = new Properties();
+
+ initProperties(props, "A", 0);
+ initProperties(props, "B", 1);
+ initProperties(props, "Exception", 2);
+
+ props.setProperty("pooling.controllerDisabled.enabled", "false");
+
+ props.setProperty("pooling.controllerException.offline.queue.limit", "INVALID NUMBER");
+
+ return props;
+ }
+
+ private void initProperties(Properties props, String suffix, int offset) {
+ props.setProperty("pooling.controller" + suffix + ".topic", "topic." + suffix);
+ props.setProperty("pooling.controller" + suffix + ".enabled", "true");
+ props.setProperty("pooling.controller" + suffix + ".offline.queue.limit", String.valueOf(5 + offset));
+ props.setProperty("pooling.controller" + suffix + ".offline.queue.age.milliseconds",
+ String.valueOf(100 + offset));
+ props.setProperty("pooling.controller" + suffix + ".start.heartbeat.milliseconds", String.valueOf(10 + offset));
+ props.setProperty("pooling.controller" + suffix + ".reactivate.milliseconds", String.valueOf(20 + offset));
+ props.setProperty("pooling.controller" + suffix + ".identification.milliseconds", String.valueOf(30 + offset));
+ props.setProperty("pooling.controller" + suffix + ".active.heartbeat.milliseconds",
+ String.valueOf(40 + offset));
+ props.setProperty("pooling.controller" + suffix + ".inter.heartbeat.milliseconds", String.valueOf(50 + offset));
+ }
+
+ /**
+ * Feature with overrides.
+ */
+ private class PoolingFeatureImpl extends PoolingFeature {
+
+ @Override
+ protected Properties getProperties(String featName) {
+ if (PoolingProperties.FEATURE_NAME.equals(featName)) {
+ return props;
+ } else {
+ throw new IllegalArgumentException("unknown feature name");
+ }
+ }
+
+ @Override
+ protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+
+ PoolingManagerImpl mgr = mock(PoolingManagerImpl.class);
+
+ managers.add(Pair.of(mgr, props));
+
+ return mgr;
+ }
+
+ @Override
+ protected PolicyController getController(DroolsController droolsController) {
+ if (droolsController == drools1) {
+ return controller1;
+ } else if (droolsController == drools2) {
+ return controller2;
+ } else if (droolsController == droolsDisabled) {
+ return controllerDisabled;
+ } else {
+ throw new IllegalArgumentException("unknown drools controller");
+ }
+ }
+
+ @Override
+ protected List<TopicSource> initTopicSources(Properties props) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected List<TopicSink> initTopicSinks(Properties props) {
+ return Collections.emptyList();
+ }
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
new file mode 100644
index 00000000..ac60ae27
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -0,0 +1,995 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.state.ActiveState;
+import org.onap.policy.drools.pooling.state.IdleState;
+import org.onap.policy.drools.pooling.state.InactiveState;
+import org.onap.policy.drools.pooling.state.QueryState;
+import org.onap.policy.drools.pooling.state.StartState;
+import org.onap.policy.drools.pooling.state.State;
+import org.onap.policy.drools.system.PolicyController;
+
+class PoolingManagerImplTest {
+
+ protected static final long STD_HEARTBEAT_WAIT_MS = 10;
+ protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
+ protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
+ protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
+ protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
+ protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
+
+ private static final String MY_HOST = "my.host";
+ private static final String HOST2 = "other.host";
+
+ private static final String MY_CONTROLLER = "my.controller";
+ private static final String MY_TOPIC = "my.topic";
+
+ private static final String TOPIC2 = "topic.two";
+
+ private static final String THE_EVENT = "the event";
+
+ private static final Object DECODED_EVENT = new Object();
+
+ /**
+ * Number of publish() invocations that should be issued when the manager is
+ * started.
+ */
+ private static final int START_PUB = 1;
+
+ /**
+ * Futures that have been allocated due to calls to scheduleXxx().
+ */
+ private Queue<ScheduledFuture<?>> futures;
+
+ private PoolingProperties poolProps;
+ private ListeningController controller;
+ private TopicMessageManager topicMessageManager;
+ private boolean gotManager;
+ private ScheduledThreadPoolExecutor sched;
+ private int schedCount;
+ private DroolsController drools;
+ private Serializer ser;
+ private CountDownLatch active;
+
+ private PoolingManagerImpl mgr;
+
+ /**
+ * Setup.
+ *
+ * @throws Exception throws exception
+ */
+ @BeforeEach
+ public void setUp() throws Exception {
+ Properties plainProps = new Properties();
+
+ poolProps = mock(PoolingProperties.class);
+ when(poolProps.getSource()).thenReturn(plainProps);
+ when(poolProps.getPoolingTopic()).thenReturn(MY_TOPIC);
+ when(poolProps.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
+ when(poolProps.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
+ when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
+ when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
+ when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
+ when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
+
+ futures = new LinkedList<>();
+ ser = new Serializer();
+ active = new CountDownLatch(1);
+
+ topicMessageManager = mock(TopicMessageManager.class);
+ gotManager = false;
+ controller = mock(ListeningController.class);
+ sched = mock(ScheduledThreadPoolExecutor.class);
+ schedCount = 0;
+ drools = mock(DroolsController.class);
+
+ when(controller.getName()).thenReturn(MY_CONTROLLER);
+ when(controller.getDrools()).thenReturn(drools);
+ when(controller.isAlive()).thenReturn(true);
+
+ when(sched.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))).thenAnswer(args -> {
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ futures.add(fut);
+
+ return fut;
+ });
+
+ when(sched.scheduleWithFixedDelay(any(Runnable.class), any(Long.class), any(Long.class), any(TimeUnit.class)))
+ .thenAnswer(args -> {
+ ScheduledFuture<?> fut = mock(ScheduledFuture.class);
+ futures.add(fut);
+
+ return fut;
+ });
+
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active);
+ }
+
+ @Test
+ void testPoolingManagerImpl() {
+ assertTrue(gotManager);
+
+ State st = mgr.getCurrent();
+ assertInstanceOf(IdleState.class, st);
+
+ // ensure the state is attached to the manager
+ assertEquals(mgr.getHost(), st.getHost());
+ }
+
+ @Test
+ void testPoolingManagerImpl_PoolEx() {
+ // throw an exception when we try to create the topic messages manager
+ PoolingFeatureException ex = new PoolingFeatureException();
+
+ assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ throw ex;
+ }
+ }).isInstanceOf(PoolingFeatureRtException.class).hasCause(ex);
+ }
+
+ @Test
+ void testGetCurrent() throws Exception {
+ assertEquals(IdleState.class, mgr.getCurrent().getClass());
+
+ startMgr();
+
+ assertEquals(StartState.class, mgr.getCurrent().getClass());
+ }
+
+ @Test
+ void testGetHost() {
+ assertEquals(MY_HOST, mgr.getHost());
+
+ mgr = new PoolingManagerTest(HOST2, controller, poolProps, active);
+ assertEquals(HOST2, mgr.getHost());
+ }
+
+ @Test
+ void testGetTopic() {
+ assertEquals(MY_TOPIC, mgr.getTopic());
+ }
+
+ @Test
+ void testGetProperties() {
+ assertEquals(poolProps, mgr.getProperties());
+ }
+
+ @Test
+ void testBeforeStart() {
+ // not running yet
+ mgr.beforeStart();
+
+ verify(topicMessageManager).startPublisher();
+
+ assertEquals(1, schedCount);
+ verify(sched).setMaximumPoolSize(1);
+ verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
+
+ // try again - nothing should happen
+ mgr.beforeStart();
+
+ verify(topicMessageManager).startPublisher();
+
+ assertEquals(1, schedCount);
+ verify(sched).setMaximumPoolSize(1);
+ verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ }
+
+ @Test
+ void testAfterStart() throws Exception {
+ startMgr();
+
+ verify(topicMessageManager).startConsumer(mgr);
+
+ State st = mgr.getCurrent();
+ assertInstanceOf(StartState.class, st);
+
+ // ensure the state is attached to the manager
+ assertEquals(mgr.getHost(), st.getHost());
+
+ ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
+ verify(sched).schedule(any(Runnable.class), timeCap.capture(), unitCap.capture());
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, timeCap.getValue().longValue());
+ assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
+
+
+ // already started - nothing else happens
+ mgr.afterStart();
+
+ verify(topicMessageManager).startConsumer(mgr);
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+
+ verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
+ }
+
+ @Test
+ void testBeforeStop() throws Exception {
+ startMgr();
+ mgr.startDistributing(makeAssignments(false));
+
+ verify(topicMessageManager, times(START_PUB)).publish(any());
+
+ mgr.beforeStop();
+
+ verify(topicMessageManager).stopConsumer(mgr);
+ verify(sched).shutdownNow();
+ verify(topicMessageManager, times(START_PUB + 1)).publish(any());
+ verify(topicMessageManager).publish(contains("offline"));
+
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
+
+ // verify that next message is handled locally
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ verify(topicMessageManager, times(START_PUB + 1)).publish(any());
+ }
+
+ @Test
+ void testBeforeStop_NotRunning() {
+ final State st = mgr.getCurrent();
+
+ mgr.beforeStop();
+
+ verify(topicMessageManager, never()).stopConsumer(any());
+ verify(sched, never()).shutdownNow();
+
+ // hasn't changed states either
+ assertEquals(st, mgr.getCurrent());
+ }
+
+ @Test
+ void testBeforeStop_AfterPartialStart() {
+ // call beforeStart but not afterStart
+ mgr.beforeStart();
+
+ final State st = mgr.getCurrent();
+
+ mgr.beforeStop();
+
+ // should still shut the scheduler down
+ verify(sched).shutdownNow();
+
+ verify(topicMessageManager, never()).stopConsumer(any());
+
+ // hasn't changed states
+ assertEquals(st, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterStop() throws Exception {
+ startMgr();
+ mgr.beforeStop();
+
+ mgr.afterStop();
+
+ verify(topicMessageManager).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
+ }
+
+ @Test
+ void testBeforeLock() throws Exception {
+ startMgr();
+
+ mgr.beforeLock();
+
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterUnlock_AliveIdle() {
+ // this really shouldn't happen
+
+ lockMgr();
+
+ mgr.afterUnlock();
+
+ // stays in idle state, because it has no scheduler
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterUnlock_AliveStarted() throws Exception {
+ startMgr();
+ lockMgr();
+
+ mgr.afterUnlock();
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterUnlock_StoppedIdle() throws Exception {
+ startMgr();
+ lockMgr();
+
+ // controller is stopped
+ when(controller.isAlive()).thenReturn(false);
+
+ mgr.afterUnlock();
+
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testAfterUnlock_StoppedStarted() throws Exception {
+ startMgr();
+
+ // Note: don't lockMgr()
+
+ // controller is stopped
+ when(controller.isAlive()).thenReturn(false);
+
+ mgr.afterUnlock();
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testChangeState() throws Exception {
+ // start should invoke changeState()
+ startMgr();
+
+ /*
+ * now go offline while it's locked
+ */
+ lockMgr();
+
+ // should have cancelled the timers
+ assertEquals(2, futures.size());
+ verify(futures.poll()).cancel(false);
+ verify(futures.poll()).cancel(false);
+
+ /*
+ * now go back online
+ */
+ unlockMgr();
+
+ // new timers should now be active
+ assertEquals(2, futures.size());
+ verify(futures.poll(), never()).cancel(false);
+ verify(futures.poll(), never()).cancel(false);
+ }
+
+ @Test
+ void testSchedule() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+ ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
+
+ verify(sched, times(2)).schedule(taskCap.capture(), timeCap.capture(), unitCap.capture());
+
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
+ assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
+
+ // execute it
+ taskCap.getValue().run();
+
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ void testScheduleWithFixedDelay() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+ ArgumentCaptor<Long> initCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
+
+ verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
+ unitCap.capture());
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
+ assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
+
+ // execute it
+ taskCap.getValue().run();
+
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ void testPublishAdmin() throws Exception {
+ Offline msg = new Offline(mgr.getHost());
+ mgr.publishAdmin(msg);
+
+ assertEquals(Message.ADMIN, msg.getChannel());
+
+ verify(topicMessageManager).publish(any());
+ }
+
+ @Test
+ void testPublish() throws Exception {
+ Offline msg = new Offline(mgr.getHost());
+ mgr.publish("my.channel", msg);
+
+ assertEquals("my.channel", msg.getChannel());
+
+ verify(topicMessageManager).publish(any());
+ }
+
+ @Test
+ void testPublish_InvalidMsg() throws Exception {
+ // message is missing data
+ mgr.publish(Message.ADMIN, new Offline());
+
+ // should not have attempted to publish it
+ verify(topicMessageManager, never()).publish(any());
+ }
+
+ @Test
+ void testPublish_TopicMessageMngEx() throws Exception {
+
+ // generate exception
+ doThrow(new PoolingFeatureException()).when(topicMessageManager).publish(any());
+
+ assertThatCode(() -> mgr.publish(Message.ADMIN, new Offline(mgr.getHost()))).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testOnTopicEvent() throws Exception {
+ startMgr();
+
+ StartState st = (StartState) mgr.getCurrent();
+
+ /*
+ * give it its heart beat, that should cause it to transition to the Query state.
+ */
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+ hb.setChannel(Message.ADMIN);
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertInstanceOf(QueryState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testOnTopicEvent_NullEvent() throws Exception {
+ startMgr();
+
+ assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testBeforeOffer_Unlocked() throws Exception {
+ startMgr();
+
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testBeforeOffer_Locked() throws Exception {
+ startMgr();
+ lockMgr();
+
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testBeforeInsert() throws Exception {
+ startMgr();
+ lockMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ @Test
+ void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
+ validateHandleReqId(null);
+ }
+
+ @Test
+ void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
+ validateHandleReqId("");
+ }
+
+ @Test
+ void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
+ startMgr();
+
+ assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
+ }
+
+ @Test
+ void testHandleExternalCommInfrastructureStringStringString() throws Exception {
+ validateUnhandled();
+ }
+
+ @Test
+ void testHandleExternalForward_NoAssignments() throws Exception {
+ validateUnhandled();
+ }
+
+ @Test
+ void testHandleExternalForward() throws Exception {
+ validateNoForward();
+ }
+
+ @Test
+ void testHandleEvent_NullTarget() throws Exception {
+ // buckets have null targets
+ validateDiscarded(new BucketAssignments(new String[] {null, null}));
+ }
+
+ @Test
+ void testHandleEvent_SameHost() throws Exception {
+ validateNoForward();
+ }
+
+ @Test
+ void testHandleEvent_DiffHost() throws Exception {
+ // route the message to the *OTHER* host
+ validateDiscarded(makeAssignments(false));
+ }
+
+ @Test
+ void testDecodeEvent_CannotDecode() throws Exception {
+
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
+ return false;
+ }
+ };
+
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testDecodeEvent_UnsuppEx() throws Exception {
+
+ // generate exception
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testDecodeEvent_ArgEx() throws Exception {
+ // generate exception
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
+ throw new IllegalArgumentException();
+ }
+ };
+
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testDecodeEvent_StateEx() throws Exception {
+ // generate exception
+ mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
+ throw new IllegalStateException();
+ }
+ };
+
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // create assignments, though they are irrelevant
+ mgr.startDistributing(makeAssignments(false));
+
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testDecodeEvent() throws Exception {
+ startMgr();
+
+ when(controller.isLocked()).thenReturn(true);
+
+ // route to another host
+ mgr.startDistributing(makeAssignments(false));
+
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
+ }
+
+ @Test
+ void testHandleInternal() throws Exception {
+ startMgr();
+
+ StartState st = (StartState) mgr.getCurrent();
+
+ /*
+ * give it its heart beat, that should cause it to transition to the Query state.
+ */
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+ hb.setChannel(Message.ADMIN);
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertInstanceOf(QueryState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testHandleInternal_IoEx() throws Exception {
+ startMgr();
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testHandleInternal_PoolEx() throws Exception {
+ startMgr();
+
+ StartState st = (StartState) mgr.getCurrent();
+
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+
+ /*
+ * do NOT set the channel - this will cause the message to be invalid, triggering
+ * an exception
+ */
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertInstanceOf(StartState.class, mgr.getCurrent());
+ }
+
+ @Test
+ void testStartDistributing() throws Exception {
+ validateNoForward();
+
+
+ // null assignments should cause message to be processed locally
+ mgr.startDistributing(null);
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ verify(topicMessageManager, times(START_PUB)).publish(any());
+
+
+ // message for this host
+ mgr.startDistributing(makeAssignments(true));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+
+
+ // message for another host
+ mgr.startDistributing(makeAssignments(false));
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ @Test
+ void testGoStart() {
+ State st = mgr.goStart();
+ assertInstanceOf(StartState.class, st);
+ assertEquals(mgr.getHost(), st.getHost());
+ }
+
+ @Test
+ void testGoQuery() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
+ mgr.startDistributing(asgn);
+
+ State st = mgr.goQuery();
+
+ assertInstanceOf(QueryState.class, st);
+ assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(asgn, mgr.getAssignments());
+ }
+
+ @Test
+ void testGoActive() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
+ mgr.startDistributing(asgn);
+
+ State st = mgr.goActive();
+
+ assertInstanceOf(ActiveState.class, st);
+ assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(asgn, mgr.getAssignments());
+ assertEquals(0, active.getCount());
+ }
+
+ @Test
+ void testGoInactive() {
+ State st = mgr.goInactive();
+ assertInstanceOf(InactiveState.class, st);
+ assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(1, active.getCount());
+ }
+
+ @Test
+ void testTimerActionRun() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+
+ verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
+
+ // execute it
+ taskCap.getValue().run();
+
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ void testTimerActionRun_DiffState() throws Exception {
+ // must start the scheduler
+ startMgr();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
+ latch.countDown();
+ return null;
+ });
+
+ // capture the task
+ ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
+
+ verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
+
+ // give it a heartbeat so that it transitions to the query state
+ StartState st = (StartState) mgr.getCurrent();
+ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
+ hb.setChannel(Message.ADMIN);
+
+ String msg = ser.encodeMsg(hb);
+
+ mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+
+ assertInstanceOf(QueryState.class, mgr.getCurrent());
+
+ // execute it
+ taskCap.getValue().run();
+
+ // it should NOT have counted down
+ assertEquals(1, latch.getCount());
+ }
+
+ private void validateHandleReqId(String requestId) throws PoolingFeatureException {
+ startMgr();
+
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ private void validateNoForward() throws PoolingFeatureException {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+
+ verify(topicMessageManager, times(START_PUB)).publish(any());
+ }
+
+ private void validateUnhandled() throws PoolingFeatureException {
+ startMgr();
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
+ startMgr();
+
+ // buckets have null targets
+ mgr.startDistributing(bucketAssignments);
+
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ }
+
+ /**
+ * Makes an assignment with two buckets.
+ *
+ * @param sameHost {@code true} if the REQUEST_ID should hash to the
+ * manager's bucket, {@code false} if it should hash to the other host's bucket
+ * @return a new bucket assignment
+ */
+ private BucketAssignments makeAssignments(boolean sameHost) {
+ int slot = DECODED_EVENT.hashCode() % 2;
+
+ // slot numbers are 0 and 1 - reverse them if it's for a different host
+ if (!sameHost) {
+ slot = 1 - slot;
+ }
+
+ String[] asgn = new String[2];
+ asgn[slot] = mgr.getHost();
+ asgn[1 - slot] = HOST2;
+
+ return new BucketAssignments(asgn);
+ }
+
+ /**
+ * Invokes methods necessary to start the manager.
+ *
+ */
+ private void startMgr() {
+ mgr.beforeStart();
+ mgr.afterStart();
+ }
+
+ /**
+ * Invokes methods necessary to lock the manager.
+ */
+ private void lockMgr() {
+ mgr.beforeLock();
+ when(controller.isLocked()).thenReturn(true);
+ }
+
+ /**
+ * Invokes methods necessary to unlock the manager.
+ */
+ private void unlockMgr() {
+ mgr.afterUnlock();
+ when(controller.isLocked()).thenReturn(false);
+ }
+
+ /**
+ * Used to create a mock object that implements both super interfaces.
+ */
+ private static interface ListeningController extends TopicListener, PolicyController {
+
+ }
+
+ /**
+ * Manager with overrides.
+ */
+ private class PoolingManagerTest extends PoolingManagerImpl {
+
+ public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+
+ super(host, controller, props, activeLatch);
+ }
+
+ @Override
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ gotManager = true;
+ return topicMessageManager;
+ }
+
+ @Override
+ protected ScheduledThreadPoolExecutor makeScheduler() {
+ ++schedCount;
+ return sched;
+ }
+
+ @Override
+ protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
+ return (drools2 == drools && TOPIC2.equals(topic2));
+ }
+
+ @Override
+ protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
+ if (drools2 == drools && TOPIC2.equals(topic2) && event == THE_EVENT) {
+ return DECODED_EVENT;
+ } else {
+ return null;
+ }
+ }
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
new file mode 100644
index 00000000..383e0071
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
@@ -0,0 +1,190 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.onap.policy.drools.pooling.PoolingProperties.ACTIVE_HEARTBEAT_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.FEATURE_ENABLED;
+import static org.onap.policy.drools.pooling.PoolingProperties.IDENTIFICATION_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.INTER_HEARTBEAT_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_AGE_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_LIMIT;
+import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_PUB_WAIT_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.POOLING_TOPIC;
+import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
+import static org.onap.policy.drools.pooling.PoolingProperties.REACTIVATE_MS;
+import static org.onap.policy.drools.pooling.PoolingProperties.START_HEARTBEAT_MS;
+
+import java.util.Properties;
+import java.util.function.Function;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+public class PoolingPropertiesTest {
+
+ private static final String CONTROLLER = "a.controller";
+
+ private static final String STD_POOLING_TOPIC = "my.topic";
+ public static final boolean STD_FEATURE_ENABLED = true;
+ public static final int STD_OFFLINE_LIMIT = 10;
+ public static final long STD_OFFLINE_AGE_MS = 1000L;
+ public static final long STD_OFFLINE_PUB_WAIT_MS = 2000L;
+ public static final long STD_START_HEARTBEAT_MS = 3000L;
+ public static final long STD_REACTIVATE_MS = 4000L;
+ public static final long STD_IDENTIFICATION_MS = 5000L;
+ public static final long STD_ACTIVE_HEARTBEAT_MS = 7000L;
+ public static final long STD_INTER_HEARTBEAT_MS = 8000L;
+
+ private Properties plain;
+ private PoolingProperties pooling;
+
+ /**
+ * Setup.
+ *
+ * @throws Exception throws an exception
+ */
+ @BeforeEach
+ public void setUp() throws Exception {
+ plain = makeProperties();
+
+ pooling = new PoolingProperties(CONTROLLER, plain);
+ }
+
+ @Test
+ void testPoolingProperties() {
+ // ensure no exceptions
+ assertThatCode(() -> new PoolingProperties(CONTROLLER, plain)).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testGetSource() {
+ assertEquals(plain, pooling.getSource());
+ }
+
+ @Test
+ void testGetPoolingTopic() {
+ assertEquals(STD_POOLING_TOPIC, pooling.getPoolingTopic());
+ }
+
+ @Test
+ void testGetOfflineLimit() throws PropertyException {
+ doTest(OFFLINE_LIMIT, STD_OFFLINE_LIMIT, 1000, xxx -> pooling.getOfflineLimit());
+ }
+
+ @Test
+ void testGetOfflineAgeMs() throws PropertyException {
+ doTest(OFFLINE_AGE_MS, STD_OFFLINE_AGE_MS, 60000L, xxx -> pooling.getOfflineAgeMs());
+ }
+
+ @Test
+ void testGetOfflinePubWaitMs() throws PropertyException {
+ doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs());
+ }
+
+ @Test
+ void testGetStartHeartbeatMs() throws PropertyException {
+ doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 100000L, xxx -> pooling.getStartHeartbeatMs());
+ }
+
+ @Test
+ void testGetReactivateMs() throws PropertyException {
+ doTest(REACTIVATE_MS, STD_REACTIVATE_MS, 50000L, xxx -> pooling.getReactivateMs());
+ }
+
+ @Test
+ void testGetIdentificationMs() throws PropertyException {
+ doTest(IDENTIFICATION_MS, STD_IDENTIFICATION_MS, 50000L, xxx -> pooling.getIdentificationMs());
+ }
+
+ @Test
+ void testGetActiveHeartbeatMs() throws PropertyException {
+ doTest(ACTIVE_HEARTBEAT_MS, STD_ACTIVE_HEARTBEAT_MS, 50000L, xxx -> pooling.getActiveHeartbeatMs());
+ }
+
+ @Test
+ void testGetInterHeartbeatMs() throws PropertyException {
+ doTest(INTER_HEARTBEAT_MS, STD_INTER_HEARTBEAT_MS, 15000L, xxx -> pooling.getInterHeartbeatMs());
+ }
+
+ /**
+ * Tests a particular property. Verifies that the correct value is returned if the
+ * specialized property has a value or the property has no value. Also verifies that
+ * the property name can be generalized.
+ *
+ * @param propnm name of the property of interest
+ * @param specValue expected specialized value
+ * @param dfltValue expected default value
+ * @param func function to get the field
+ * @throws PropertyException if an error occurs
+ */
+ private <T> void doTest(String propnm, T specValue, T dfltValue, Function<Void, T> func) throws PropertyException {
+ /*
+ * With specialized property
+ */
+ pooling = new PoolingProperties(CONTROLLER, plain);
+ assertEquals(specValue, func.apply(null), "special " + propnm);
+
+ /*
+ * Without the property - should use the default value.
+ */
+ plain.remove(specialize(propnm));
+ plain.remove(propnm);
+ pooling = new PoolingProperties(CONTROLLER, plain);
+ assertEquals(dfltValue, func.apply(null), "default " + propnm);
+ }
+
+ /**
+ * Makes a set of properties, where all the properties are specialized for the
+ * controller.
+ *
+ * @return a new property set
+ */
+ private Properties makeProperties() {
+ Properties props = new Properties();
+
+ props.setProperty(specialize(POOLING_TOPIC), STD_POOLING_TOPIC);
+ props.setProperty(specialize(FEATURE_ENABLED), "" + STD_FEATURE_ENABLED);
+ props.setProperty(specialize(OFFLINE_LIMIT), "" + STD_OFFLINE_LIMIT);
+ props.setProperty(specialize(OFFLINE_AGE_MS), "" + STD_OFFLINE_AGE_MS);
+ props.setProperty(specialize(OFFLINE_PUB_WAIT_MS), "" + STD_OFFLINE_PUB_WAIT_MS);
+ props.setProperty(specialize(START_HEARTBEAT_MS), "" + STD_START_HEARTBEAT_MS);
+ props.setProperty(specialize(REACTIVATE_MS), "" + STD_REACTIVATE_MS);
+ props.setProperty(specialize(IDENTIFICATION_MS), "" + STD_IDENTIFICATION_MS);
+ props.setProperty(specialize(ACTIVE_HEARTBEAT_MS), "" + STD_ACTIVE_HEARTBEAT_MS);
+ props.setProperty(specialize(INTER_HEARTBEAT_MS), "" + STD_INTER_HEARTBEAT_MS);
+
+ return props;
+ }
+
+ /**
+ * Embeds a specializer within a property name, after the prefix.
+ *
+ * @param propnm property name into which it should be embedded
+ * @return the property name, with the specializer embedded within it
+ */
+ private String specialize(String propnm) {
+ String suffix = propnm.substring(PREFIX.length());
+ return PREFIX + PoolingPropertiesTest.CONTROLLER + "." + suffix;
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
new file mode 100644
index 00000000..a81ea68b
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import com.google.gson.JsonParseException;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Query;
+
+class SerializerTest {
+
+ @Test
+ void testSerializer() {
+ assertThatCode(Serializer::new).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testEncodeMsg_testDecodeMsg() {
+ Serializer ser = new Serializer();
+
+ Query msg = new Query("hostA");
+ msg.setChannel("channelB");
+
+ String encoded = ser.encodeMsg(msg);
+ assertNotNull(encoded);
+
+ Message decoded = ser.decodeMsg(encoded);
+ assertEquals(Query.class, decoded.getClass());
+
+ assertEquals(msg.getSource(), decoded.getSource());
+ assertEquals(msg.getChannel(), decoded.getChannel());
+
+ // should work a second time, too
+ encoded = ser.encodeMsg(msg);
+ assertNotNull(encoded);
+
+ decoded = ser.decodeMsg(encoded);
+ assertEquals(Query.class, decoded.getClass());
+
+ assertEquals(msg.getSource(), decoded.getSource());
+ assertEquals(msg.getChannel(), decoded.getChannel());
+
+ // invalid subclass when encoding
+ Message msg2 = new Message() {};
+ assertThatThrownBy(() -> ser.encodeMsg(msg2)).isInstanceOf(JsonParseException.class)
+ .hasMessageContaining("cannot serialize");
+
+ // missing type when decoding
+ final String enc2 = encoded.replaceAll("type", "other-field-name");
+
+ assertThatThrownBy(() -> ser.decodeMsg(enc2)).isInstanceOf(JsonParseException.class)
+ .hasMessageContaining("does not contain a field named");
+
+ // invalid type
+ final String enc3 = encoded.replaceAll("query", "invalid-type");
+
+ assertThatThrownBy(() -> ser.decodeMsg(enc3)).isInstanceOf(JsonParseException.class)
+ .hasMessage("cannot deserialize \"invalid-type\"");
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/TopicMessageManagerTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/TopicMessageManagerTest.java
new file mode 100644
index 00000000..74098487
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/TopicMessageManagerTest.java
@@ -0,0 +1,322 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+
+class TopicMessageManagerTest {
+
+ private static final String EXPECTED = "expected";
+ private static final String MY_TOPIC = "my.topic";
+ private static final String MSG = "a message";
+
+ private TopicListener listener;
+ private TopicSource source;
+ private boolean gotSources;
+ private TopicSink sink;
+ private boolean gotSinks;
+ private TopicMessageManager mgr;
+
+ /**
+ * Setup.
+ *
+ * @throws Exception throws an exception
+ */
+ @BeforeEach
+ public void setUp() throws Exception {
+ listener = mock(TopicListener.class);
+ source = mock(TopicSource.class);
+ gotSources = false;
+ sink = mock(TopicSink.class);
+ gotSinks = false;
+
+ when(source.getTopic()).thenReturn(MY_TOPIC);
+
+ when(sink.getTopic()).thenReturn(MY_TOPIC);
+ when(sink.send(any())).thenReturn(true);
+
+ mgr = new TopicMessageManagerImpl(MY_TOPIC);
+ }
+
+ @Test
+ void testTopicMessageManager() {
+ // verify that the init methods were called
+ assertTrue(gotSources);
+ assertTrue(gotSinks);
+ }
+
+ @Test
+ void testTopicMessageManager_PoolingEx() {
+ // force error by having no topics match
+ when(source.getTopic()).thenReturn("");
+
+ assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC));
+ }
+
+ @Test
+ void testTopicMessageManager_IllegalArgEx() {
+ // force error
+ assertThrows(PoolingFeatureException.class, () ->
+ new TopicMessageManagerImpl(MY_TOPIC) {
+ @Override
+ protected List<TopicSource> getTopicSources() {
+ throw new IllegalArgumentException(EXPECTED);
+ }
+ });
+ }
+
+ @Test
+ void testGetTopic() {
+ assertEquals(MY_TOPIC, mgr.getTopic());
+ }
+
+ @Test
+ void testFindTopicSource_NotFound() {
+ // one item in list, and its topic doesn't match
+ assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
+ @Override
+ protected List<TopicSource> getTopicSources() {
+ return Collections.singletonList(mock(TopicSource.class));
+ }
+ });
+ }
+
+ @Test
+ void testFindTopicSource_EmptyList() {
+ // empty list
+ assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
+ @Override
+ protected List<TopicSource> getTopicSources() {
+ return Collections.emptyList();
+ }
+ });
+ }
+
+ @Test
+ void testFindTopicSink_NotFound() {
+ // one item in list, and its topic doesn't match
+ assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
+ @Override
+ protected List<TopicSink> getTopicSinks() {
+ return Collections.singletonList(mock(TopicSink.class));
+ }
+ });
+ }
+
+ @Test
+ void testFindTopicSink_EmptyList() {
+ // empty list
+ assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
+ @Override
+ protected List<TopicSink> getTopicSinks() {
+ return Collections.emptyList();
+ }
+ });
+ }
+
+ @Test
+ void testStartPublisher() throws PoolingFeatureException {
+
+ mgr.startPublisher();
+
+ // restart should have no effect
+ mgr.startPublisher();
+
+ // should be able to publish now
+ mgr.publish(MSG);
+ verify(sink).send(MSG);
+ }
+
+ @Test
+ void testStopPublisher() {
+ // not publishing yet, so stopping should have no effect
+ mgr.stopPublisher(0);
+
+ // now start it
+ mgr.startPublisher();
+
+ // this time, stop should do something
+ mgr.stopPublisher(0);
+
+ // re-stopping should have no effect
+ assertThatCode(() -> mgr.stopPublisher(0)).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testStopPublisher_WithDelay() {
+
+ mgr.startPublisher();
+
+ long tbeg = System.currentTimeMillis();
+
+ mgr.stopPublisher(100L);
+
+ assertTrue(System.currentTimeMillis() >= tbeg + 100L);
+ }
+
+ @Test
+ void testStopPublisher_WithDelayInterrupted() throws Exception {
+
+ mgr.startPublisher();
+
+ long minms = 2000L;
+
+ // tell the publisher to stop in minms + additional time
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread thread = new Thread(() -> {
+ latch.countDown();
+ mgr.stopPublisher(minms + 3000L);
+ });
+ thread.start();
+
+ // wait for the thread to start
+ latch.await();
+
+ // interrupt it - it should immediately finish its work
+ thread.interrupt();
+
+ // wait for it to stop, but only wait the minimum time
+ thread.join(minms);
+
+ assertFalse(thread.isAlive());
+ }
+
+ @Test
+ void testStartConsumer() {
+ // not started yet
+ verify(source, never()).register(any());
+
+ mgr.startConsumer(listener);
+ verify(source).register(listener);
+
+ // restart should have no effect
+ mgr.startConsumer(listener);
+ verify(source).register(listener);
+ }
+
+ @Test
+ void testStopConsumer() {
+ // not consuming yet, so stopping should have no effect
+ mgr.stopConsumer(listener);
+ verify(source, never()).unregister(any());
+
+ // now start it
+ mgr.startConsumer(listener);
+
+ // this time, stop should do something
+ mgr.stopConsumer(listener);
+ verify(source).unregister(listener);
+
+ // re-stopping should have no effect
+ mgr.stopConsumer(listener);
+ verify(source).unregister(listener);
+ }
+
+ @Test
+ void testPublish() throws PoolingFeatureException {
+ // cannot publish before starting
+ assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,pre").isInstanceOf(PoolingFeatureException.class);
+
+ mgr.startPublisher();
+
+ // publish several messages
+ mgr.publish(MSG);
+ verify(sink).send(MSG);
+
+ mgr.publish(MSG + "a");
+ verify(sink).send(MSG + "a");
+
+ mgr.publish(MSG + "b");
+ verify(sink).send(MSG + "b");
+
+ // stop and verify we can no longer publish
+ mgr.stopPublisher(0);
+ assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,stopped").isInstanceOf(PoolingFeatureException.class);
+ }
+
+ @Test
+ void testPublish_SendFailed() {
+ mgr.startPublisher();
+
+ // arrange for send() to fail
+ when(sink.send(MSG)).thenReturn(false);
+
+ assertThrows(PoolingFeatureException.class, () -> mgr.publish(MSG));
+ }
+
+ @Test
+ void testPublish_SendEx() {
+ mgr.startPublisher();
+
+ // arrange for send() to throw an exception
+ doThrow(new IllegalStateException(EXPECTED)).when(sink).send(MSG);
+
+ assertThrows(PoolingFeatureException.class, () -> mgr.publish(MSG));
+ }
+
+ /**
+ * Manager with overrides.
+ */
+ private class TopicMessageManagerImpl extends TopicMessageManager {
+
+ public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
+ super(topic);
+ }
+
+ @Override
+ protected List<TopicSource> getTopicSources() {
+ gotSources = true;
+
+ // three sources, with the desired one in the middle
+ return Arrays.asList(mock(TopicSource.class), source, mock(TopicSource.class));
+ }
+
+ @Override
+ protected List<TopicSink> getTopicSinks() {
+ gotSinks = true;
+
+ // three sinks, with the desired one in the middle
+ return Arrays.asList(mock(TopicSink.class), sink, mock(TopicSink.class));
+ }
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/feature-pooling-messages.properties b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/feature-pooling-messages.properties
new file mode 100644
index 00000000..b89e4062
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/feature-pooling-messages.properties
@@ -0,0 +1,47 @@
+# Copyright 2018 AT&T Intellectual Property. All rights reserved
+# Modifications Copyright (C) 2024 Nordix Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pooling.controllerA.topic = topic.A
+pooling.controllerA.enabled = true
+pooling.controllerA.offline.queue.limit = 5
+pooling.controllerA.offline.queue.age.milliseconds = 100
+pooling.controllerA.start.heartbeat.milliseconds = 10
+pooling.controllerA.reactivate.milliseconds = 20
+pooling.controllerA.identification.milliseconds = 30
+pooling.controllerA.active.heartbeat.milliseconds = 40
+pooling.controllerA.inter.heartbeat.milliseconds = 50
+
+pooling.controllerB.topic = topic.B
+pooling.controllerB.enabled = true
+pooling.controllerB.offline.queue.limit = 6
+pooling.controllerB.offline.queue.age.milliseconds = 101
+pooling.controllerB.start.heartbeat.milliseconds = 11
+pooling.controllerB.reactivate.milliseconds = 21
+pooling.controllerB.identification.milliseconds = 31
+pooling.controllerB.active.heartbeat.milliseconds = 41
+pooling.controllerB.inter.heartbeat.milliseconds = 51
+
+pooling.controllerDisabled.enabled = false
+
+# this has an invalid property
+pooling.controllerException.topic = topic.B
+pooling.controllerException.enabled = true
+pooling.controllerException.offline.queue.limit = INVALID NUMBER
+pooling.controllerException.offline.queue.age.milliseconds = 101
+pooling.controllerException.start.heartbeat.milliseconds = 11
+pooling.controllerException.reactivate.milliseconds = 21
+pooling.controllerException.identification.milliseconds = 31
+pooling.controllerException.active.heartbeat.milliseconds = 41
+pooling.controllerException.inter.heartbeat.milliseconds = 51
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java
new file mode 100644
index 00000000..ca47f9c1
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java
@@ -0,0 +1,361 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.Arrays;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+
+class BucketAssignmentsTest {
+
+ @Test
+ void testBucketAssignments() {
+ assertThatCode(BucketAssignments::new).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testBucketAssignmentsStringArray() {
+ String[] arr = {"abc", "def"};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertNotNull(asgn.getHostArray());
+ assertEquals(Arrays.toString(arr), Arrays.toString(asgn.getHostArray()));
+ }
+
+ @Test
+ void testGetHostArray_testSetHostArray() {
+
+ String[] arr = {"abc", "def"};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertNotNull(asgn.getHostArray());
+ assertEquals(Arrays.toString(arr), Arrays.toString(asgn.getHostArray()));
+
+ String[] arr2 = {"xyz"};
+ asgn.setHostArray(arr2);
+
+ assertNotNull(asgn.getHostArray());
+ assertEquals(Arrays.toString(arr2), Arrays.toString(asgn.getHostArray()));
+ }
+
+ @Test
+ void testGetLeader() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertNull(asgn.getLeader());
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertNull(asgn.getLeader());
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertNull(asgn.getLeader());
+
+ // some entries are null
+ asgn.setHostArray(new String[] {null, "abc", null});
+ assertEquals("abc", asgn.getLeader());
+
+ // only one entry
+ asgn.setHostArray(new String[] {"abc"});
+ assertEquals("abc", asgn.getLeader());
+
+ // first is least
+ asgn.setHostArray(new String[] {"Ahost", "Bhost", "Chost"});
+ assertEquals("Ahost", asgn.getLeader());
+
+ // middle is least
+ asgn.setHostArray(new String[] {"Xhost", "Bhost", "Chost"});
+ assertEquals("Bhost", asgn.getLeader());
+
+ // last is least
+ asgn.setHostArray(new String[] {"Xhost", "Yhost", "Chost"});
+ assertEquals("Chost", asgn.getLeader());
+
+ // multiple entries
+ asgn.setHostArray(new String[] {"Xhost", "Bhost", "Chost", "Bhost", "Xhost", "Chost"});
+ assertEquals("Bhost", asgn.getLeader());
+ }
+
+ @Test
+ void testHasAssignment() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertFalse(asgn.hasAssignment("abc"));
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertFalse(asgn.hasAssignment("abc"));
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertFalse(asgn.hasAssignment("abc"));
+
+ // some entries are null
+ asgn.setHostArray(new String[] {null, "abc", null});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // only one entry
+ asgn.setHostArray(new String[] {"abc"});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // appears as first entry
+ asgn.setHostArray(new String[] {"abc", "Bhost", "Chost"});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // appears in middle
+ asgn.setHostArray(new String[] {"Xhost", "abc", "Chost"});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // appears last
+ asgn.setHostArray(new String[] {"Xhost", "Yhost", "abc"});
+ assertTrue(asgn.hasAssignment("abc"));
+
+ // appears repeatedly
+ asgn.setHostArray(new String[] {"Xhost", "Bhost", "Chost", "Bhost", "Xhost", "Chost"});
+ assertTrue(asgn.hasAssignment("Bhost"));
+ }
+
+ @Test
+ void testGetAllHosts() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertEquals("[]", getSortedHosts(asgn).toString());
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertEquals("[]", getSortedHosts(asgn).toString());
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertEquals("[]", getSortedHosts(asgn).toString());
+
+ // some entries are null
+ asgn.setHostArray(new String[] {null, "abc", null});
+ assertEquals("[abc]", getSortedHosts(asgn).toString());
+
+ // only one entry
+ asgn.setHostArray(new String[] {"abc"});
+ assertEquals("[abc]", getSortedHosts(asgn).toString());
+
+ // multiple, repeated entries
+ asgn.setHostArray(new String[] {"def", "abc", "def", "ghi", "def", "def", "xyz"});
+ assertEquals("[abc, def, ghi, xyz]", getSortedHosts(asgn).toString());
+ }
+
+ /**
+ * Gets the hosts, sorted, so that the order is predictable.
+ *
+ * @param asgn assignment whose hosts are to be retrieved
+ * @return a new, sorted set of hosts
+ */
+ private SortedSet<String> getSortedHosts(BucketAssignments asgn) {
+ return new TreeSet<>(asgn.getAllHosts());
+ }
+
+ @Test
+ void testGetAssignedHost() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertNull(asgn.getAssignedHost(3));
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertNull(asgn.getAssignedHost(3));
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertNull(asgn.getAssignedHost(3));
+
+ // multiple, repeated entries
+ String[] arr = {"def", "abc", "def", "ghi", "def", "def", "xyz"};
+ asgn.setHostArray(arr);
+
+ /*
+ * get assignments for consecutive integers, including negative numbers and
+ * numbers extending past the length of the array.
+ *
+ */
+ TreeSet<String> seen = new TreeSet<>();
+ for (int x = -1; x < arr.length + 2; ++x) {
+ seen.add(asgn.getAssignedHost(x));
+ }
+
+ TreeSet<String> expected = new TreeSet<>(Arrays.asList(arr));
+ assertEquals(expected, seen);
+
+ // try a much bigger number
+ assertNotNull(asgn.getAssignedHost(arr.length * 1000));
+ }
+
+ @Test
+ void testSize() {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ assertEquals(0, asgn.size());
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ assertEquals(0, asgn.size());
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ assertEquals(5, asgn.size());
+
+ // multiple, repeated entries
+ String[] arr = {"def", "abc", "def", "ghi", "def", "def", "xyz"};
+ asgn.setHostArray(arr);
+ assertEquals(arr.length, asgn.size());
+ }
+
+ @Test
+ void testCheckValidity() throws Exception {
+ // host array is null
+ BucketAssignments asgn = new BucketAssignments();
+ expectException(asgn);
+
+ // array is non-null, but empty
+ asgn.setHostArray(new String[0]);
+ expectException(asgn);
+
+ // array is too big
+ asgn.setHostArray(new String[BucketAssignments.MAX_BUCKETS + 1]);
+ expectException(asgn);
+
+ // all entries are null
+ asgn.setHostArray(new String[5]);
+ expectException(asgn);
+
+ // null at the beginning
+ asgn.setHostArray(new String[] {null, "Bhost", "Chost"});
+ expectException(asgn);
+
+ // null in the middle
+ asgn.setHostArray(new String[] {"Ahost", null, "Chost"});
+ expectException(asgn);
+
+ // null at the end
+ asgn.setHostArray(new String[] {"Ahost", "Bhost", null});
+ expectException(asgn);
+
+ // only one entry
+ asgn.setHostArray(new String[] {"abc"});
+ asgn.checkValidity();
+
+ // multiple entries
+ asgn.setHostArray(new String[] {"Ahost", "Bhost", "Chost"});
+ asgn.checkValidity();
+ }
+
+ @Test
+ void testHashCode() {
+ // with null assignments
+ BucketAssignments asgn = new BucketAssignments();
+ asgn.hashCode();
+
+ // with empty array
+ asgn = new BucketAssignments(new String[0]);
+ asgn.hashCode();
+
+ // with null items
+ asgn = new BucketAssignments(new String[] {"abc", null, "def"});
+ asgn.hashCode();
+
+ // same assignments
+ asgn = new BucketAssignments(new String[] {"abc", null, "def"});
+ int code = asgn.hashCode();
+
+ asgn = new BucketAssignments(new String[] {"abc", null, "def"});
+ assertEquals(code, asgn.hashCode());
+
+ // slightly different values (i.e., changed "def" to "eef")
+ asgn = new BucketAssignments(new String[] {"abc", null, "eef"});
+ assertNotEquals(code, asgn.hashCode());
+ }
+
+ @Test
+ void testEquals() {
+ // null object
+ BucketAssignments asgn = new BucketAssignments();
+ assertNotEquals(null, asgn);
+
+ // same object
+ asgn = new BucketAssignments();
+ assertEquals(asgn, asgn);
+
+ // different class of object
+ asgn = new BucketAssignments();
+ assertNotEquals("not an assignment object", asgn);
+
+ assertNotEquals(asgn, new BucketAssignments(new String[] {"abc"}));
+
+ // with null assignments
+ asgn = new BucketAssignments();
+ assertEquals(asgn, new BucketAssignments());
+
+ // with empty array
+ asgn = new BucketAssignments(new String[0]);
+ assertEquals(asgn, asgn);
+
+ assertNotEquals(asgn, new BucketAssignments());
+ assertNotEquals(asgn, new BucketAssignments(new String[] {"abc"}));
+
+ // with null items
+ String[] arr = {"abc", null, "def"};
+ asgn = new BucketAssignments(arr);
+ assertEquals(asgn, asgn);
+ assertEquals(asgn, new BucketAssignments(arr));
+ assertEquals(asgn, new BucketAssignments(new String[] {"abc", null, "def"}));
+
+ assertNotEquals(asgn, new BucketAssignments());
+ assertNotEquals(asgn, new BucketAssignments(new String[] {"abc", null, "XYZ"}));
+
+ assertNotEquals(asgn, new BucketAssignments());
+ }
+
+ /**
+ * Expects an exception when checkValidity() is called.
+ *
+ * @param asgn assignments to be checked
+ */
+ private void expectException(BucketAssignments asgn) {
+ try {
+ asgn.checkValidity();
+ fail("missing exception");
+
+ } catch (PoolingFeatureException expected) {
+ // success
+ }
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
new file mode 100644
index 00000000..142ebbca
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class HeartbeatTest extends SupportBasicMessageTester<Heartbeat> {
+
+ /**
+ * Sequence number to validate time stamps within the heart beat.
+ */
+ private long sequence = 0;
+
+ public HeartbeatTest() {
+ super(Heartbeat.class);
+ }
+
+ @Override
+ public Heartbeat makeValidMessage() {
+ Heartbeat msg = new Heartbeat(VALID_HOST, ++sequence);
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+ @Override
+ public void testDefaultConstructorFields(Heartbeat msg) {
+ super.testDefaultConstructorFields(msg);
+
+ assertEquals(sequence, msg.getTimestampMs());
+ }
+
+ @Override
+ public void testValidFields(Heartbeat msg) {
+ super.testValidFields(msg);
+
+ assertEquals(sequence, msg.getTimestampMs());
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
new file mode 100644
index 00000000..dced372d
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class IdentificationTest extends SupportMessageWithAssignmentsTester<Identification> {
+
+ public IdentificationTest() {
+ super(Identification.class);
+ }
+
+ @BeforeEach
+ public void setUp() {
+ setNullAssignments(false);
+ }
+
+ /**
+ * The superclass will already invoke testJsonEncodeDecode() to verify that
+ * things work with a fully populated message. This verifies that it also
+ * works if the assignments are null.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ final void testJsonEncodeDecode_WithNullAssignments() throws Exception {
+ setNullAssignments(true);
+ testJsonEncodeDecode();
+ }
+
+ /**
+ * The superclass will already invoke testCheckValidity() to
+ * verify that things work with a fully populated message. This verifies
+ * that it also works if the assignments are null.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ void testCheckValidity_NullAssignments() throws Exception {
+ // null assignments are OK
+ Identification msg = makeValidMessage();
+ msg.setAssignments(null);
+ msg.checkValidity();
+ }
+
+ @Override
+ public Identification makeValidMessage() {
+ Identification msg = new Identification(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN));
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
new file mode 100644
index 00000000..15d9d338
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class LeaderTest extends SupportMessageWithAssignmentsTester<Leader> {
+
+ public LeaderTest() {
+ super(Leader.class);
+ }
+
+ @BeforeEach
+ public void setUp() {
+ setNullAssignments(false);
+ }
+
+ /**
+ * The superclass will already invoke testCheckValidity_InvalidFields() to
+ * verify that things work with a fully populated message. This verifies
+ * that it also works if the assignments are null.
+ *
+ */
+ @Test
+ void testCheckValidity_InvalidFields_NullAssignments() {
+ // null assignments are invalid
+ expectCheckValidityFailure(msg -> msg.setAssignments(null));
+ }
+
+ @Test
+ void testCheckValidity_SourceIsNotLeader() {
+ Leader ldr = makeValidMessage();
+
+ ldr.setSource("xyz");
+
+ // the source does not have an assignment
+ BucketAssignments asgnUnassigned = new BucketAssignments(new String[] {"abc", "def"});
+ expectCheckValidityFailure(msg -> msg.setAssignments(asgnUnassigned));
+
+ // the source is not the smallest UUID in this assignment
+ BucketAssignments asgnNotSmallest = new BucketAssignments(new String[] {VALID_HOST_PREDECESSOR, VALID_HOST});
+ expectCheckValidityFailure(msg -> msg.setAssignments(asgnNotSmallest));
+ }
+
+ @Override
+ public Leader makeValidMessage() {
+ Leader msg = new Leader(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN));
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/MessageTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/MessageTest.java
new file mode 100644
index 00000000..cf2cb695
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/MessageTest.java
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+class MessageTest extends SupportBasicMessageTester<Message> {
+
+ public MessageTest() {
+ super(Message.class);
+ }
+
+ @Test
+ void testGetSource_testSetSource() {
+ Message msg = new Message();
+
+ msg.setSource("hello");
+ assertEquals("hello", msg.getSource());
+ assertNull(msg.getChannel());
+
+ msg.setSource("world");
+ assertEquals("world", msg.getSource());
+ assertNull(msg.getChannel());
+ }
+
+ @Test
+ void testGetChannel_testSetChannel() {
+ Message msg = new Message();
+
+ msg.setChannel("hello");
+ assertEquals("hello", msg.getChannel());
+ assertNull(msg.getSource());
+
+ msg.setChannel("world");
+ assertEquals("world", msg.getChannel());
+ assertNull(msg.getSource());
+ }
+
+ @Test
+ void testCheckValidity_InvalidFields() {
+ // null or empty source
+ expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setSource(value));
+
+ // null or empty channel
+ expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setChannel(value));
+ }
+
+ /**
+ * Makes a message that will pass the validity check.
+ *
+ * @return a valid Message
+ */
+ public Message makeValidMessage() {
+ Message msg = new Message(VALID_HOST);
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
new file mode 100644
index 00000000..29a7fad1
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+public class OfflineTest extends SupportBasicMessageTester<Offline> {
+
+ public OfflineTest() {
+ super(Offline.class);
+ }
+
+ @Override
+ public Offline makeValidMessage() {
+ Offline msg = new Offline(VALID_HOST);
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
new file mode 100644
index 00000000..737bcc19
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+public class QueryTest extends SupportBasicMessageTester<Query> {
+
+ public QueryTest() {
+ super(Query.class);
+ }
+
+ @Override
+ public Query makeValidMessage() {
+ Query msg = new Query(VALID_HOST);
+ msg.setChannel(VALID_CHANNEL);
+
+ return msg;
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/SupportBasicMessageTester.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/SupportBasicMessageTester.java
new file mode 100644
index 00000000..2fe905a9
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/SupportBasicMessageTester.java
@@ -0,0 +1,251 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+import org.onap.policy.drools.pooling.Serializer;
+
+/**
+ * Superclass used to test subclasses of {@link Message}.
+ *
+ * @param <T> type of {@link Message} subclass that this tests
+ */
+public abstract class SupportBasicMessageTester<T extends Message> {
+ // values set by makeValidMessage()
+ public static final String VALID_HOST_PREDECESSOR = "hostA";
+ public static final String VALID_HOST = "hostB";
+ public static final String VALID_CHANNEL = "channelC";
+
+ /**
+ * Used to perform JSON serialization and de-serialization.
+ */
+ public final Serializer mapper = new Serializer();
+
+ /**
+ * The subclass of the type of Message being tested.
+ */
+ private final Class<T> subclazz;
+
+ /**
+ * Constructor.
+ *
+ * @param subclazz subclass of {@link Message} being tested
+ */
+ public SupportBasicMessageTester(Class<T> subclazz) {
+ this.subclazz = subclazz;
+ }
+
+ /**
+ * Creates a default Message and verifies that the source and channel are
+ * {@code null}.
+ *
+ */
+ @Test
+ public final void testDefaultConstructor() {
+ testDefaultConstructorFields(makeDefaultMessage());
+ }
+
+ /**
+ * Tests that the Message has the correct source, and that the channel is
+ * {@code null}.
+ *
+ */
+ @Test
+ public final void testConstructorWithArgs() {
+ testValidFields(makeValidMessage());
+ }
+
+ /**
+ * Makes a valid message and then verifies that it can be serialized and
+ * de-serialized. Verifies that the de-serialized message is of the same
+ * type, and has the same content, as the original.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public final void testJsonEncodeDecode() throws Exception {
+ T originalMsg = makeValidMessage();
+
+ Message msg;
+ if (originalMsg.getClass() == Message.class) {
+ msg = originalMsg;
+ } else {
+ msg = mapper.decodeMsg(mapper.encodeMsg(originalMsg));
+ }
+
+ assertEquals(subclazz, msg.getClass());
+
+ msg.checkValidity();
+
+ testValidFields(subclazz.cast(msg));
+ }
+
+ /**
+ * Creates a valid Message and verifies that checkValidity() passes.
+ *
+ * @throws PoolingFeatureException if an error occurs
+ */
+ @Test
+ public final void testCheckValidity_Ok() throws PoolingFeatureException {
+ T msg = makeValidMessage();
+ msg.checkValidity();
+
+ testValidFields(subclazz.cast(msg));
+ }
+
+ /**
+ * Creates a default Message and verifies that checkValidity() fails. Does
+ * not throw an exception.
+ */
+ @Test
+ public final void testCheckValidity_DefaultConstructor() {
+ try {
+ makeDefaultMessage().checkValidity();
+ fail("missing exception");
+
+ } catch (PoolingFeatureException expected) {
+ // success
+ }
+ }
+
+ /**
+ * Creates a message via {@link #makeValidMessage()}, updates it via the
+ * given function, and then invokes the checkValidity() method on it. It is
+ * expected that the checkValidity() will throw an exception.
+ *
+ * @param func function to update the message prior to invoking
+ * checkValidity()
+ */
+ public void expectCheckValidityFailure(MessageUpdateFunction<T> func) {
+ try {
+ T msg = makeValidMessage();
+ func.update(msg);
+
+ msg.checkValidity();
+
+ fail("missing exception");
+
+ } catch (PoolingFeatureException expected) {
+ // success
+ }
+ }
+
+ /**
+ * Creates a message via {@link #makeValidMessage()}, updates one of its
+ * fields via the given function, and then invokes the checkValidity()
+ * method on it. It is expected that the checkValidity() will throw an
+ * exception. It checks both the case when the message's field is set to
+ * {@code null}, and when it is set to empty (i.e., "").
+ *
+ * @param func function to update the message's field prior to invoking
+ * checkValidity()
+ */
+ public void expectCheckValidityFailure_NullOrEmpty(MessageFieldUpdateFunction<T> func) {
+ expectCheckValidityFailure(msg -> func.update(msg, null));
+ expectCheckValidityFailure(msg -> func.update(msg, ""));
+ }
+
+ /**
+ * Makes a message using the default constructor.
+ *
+ * @return a new Message
+ */
+ public final T makeDefaultMessage() {
+ try {
+ return subclazz.getConstructor().newInstance();
+
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ }
+
+
+ // the remaining methods will typically be overridden
+
+ /**
+ * Makes a message that will pass the validity check. Note: this should use
+ * the non-default constructor, and the source and channel should be set to
+ * VALID_HOST and VALID_CHANNEL, respectively.
+ *
+ * @return a valid Message
+ */
+ public abstract T makeValidMessage();
+
+ /**
+ * Verifies that fields are set as expected by
+ * {@link #makeDefaultMessage()}.
+ *
+ * @param msg the default Message
+ */
+ public void testDefaultConstructorFields(T msg) {
+ assertNull(msg.getSource());
+ assertNull(msg.getChannel());
+ }
+
+ /**
+ * Verifies that fields are set as expected by {@link #makeValidMessage()}.
+ *
+ * @param msg message whose fields are to be validated
+ */
+ public void testValidFields(T msg) {
+ assertEquals(VALID_HOST, msg.getSource());
+ assertEquals(VALID_CHANNEL, msg.getChannel());
+ }
+
+ /**
+ * Function that updates a message.
+ *
+ * @param <T> type of Message the function updates
+ */
+ @FunctionalInterface
+ public static interface MessageUpdateFunction<T extends Message> {
+
+ /**
+ * Updates a message.
+ *
+ * @param msg message to be updated
+ */
+ void update(T msg);
+ }
+
+ /**
+ * Function that updates a single field within a message.
+ *
+ * @param <T> type of Message the function updates
+ */
+ @FunctionalInterface
+ public static interface MessageFieldUpdateFunction<T extends Message> {
+
+ /**
+ * Updates a field within a message.
+ *
+ * @param msg message to be updated
+ * @param newValue new field value
+ */
+ void update(T msg, String newValue);
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/SupportMessageWithAssignmentsTester.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/SupportMessageWithAssignmentsTester.java
new file mode 100644
index 00000000..3814553f
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/message/SupportMessageWithAssignmentsTester.java
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Superclass used to test subclasses of {@link MessageWithAssignments}.
+ *
+ * @param <T> type of {@link MessageWithAssignments} subclass that this tests
+ */
+@Setter
+@Getter
+public abstract class SupportMessageWithAssignmentsTester<T extends MessageWithAssignments>
+ extends SupportBasicMessageTester<T> {
+ // values set by makeValidMessage()
+ public static final String[] VALID_ARRAY = {VALID_HOST, VALID_HOST + "_xxx"};
+ public static final BucketAssignments VALID_ASGN = new BucketAssignments(VALID_ARRAY);
+
+ /**
+ * {@code True} if {@code null} assignments are allowed, {@code false}
+ * otherwise.
+ */
+ private boolean nullAssignments;
+
+ /**
+ * Constructor.
+ *
+ * @param subclazz subclass of {@link MessageWithAssignments} being tested
+ */
+ public SupportMessageWithAssignmentsTester(Class<T> subclazz) {
+ super(subclazz);
+ }
+
+ @Test
+ public void testCheckValidity_InvalidFields() {
+ // null source (i.e., superclass field)
+ expectCheckValidityFailure(msg -> msg.setSource(null));
+
+ // empty assignments
+ expectCheckValidityFailure(msg -> msg.setAssignments(new BucketAssignments(new String[0])));
+
+ // invalid assignment
+ String[] invalidAssignment = {"abc", null};
+ expectCheckValidityFailure(msg -> msg.setAssignments(new BucketAssignments(invalidAssignment)));
+ }
+
+ @Test
+ public void testGetAssignments_testSetAssignments() {
+ MessageWithAssignments msg = makeValidMessage();
+
+ // from constructor
+ assertEquals(VALID_ASGN, msg.getAssignments());
+
+ BucketAssignments asgn = new BucketAssignments();
+ msg.setAssignments(asgn);
+ assertEquals(asgn, msg.getAssignments());
+ }
+
+ @Override
+ public void testDefaultConstructorFields(T msg) {
+ super.testDefaultConstructorFields(msg);
+
+ assertNull(msg.getAssignments());
+ }
+
+ @Override
+ public void testValidFields(T msg) {
+ super.testValidFields(msg);
+
+ if (nullAssignments) {
+ assertNull(msg.getAssignments());
+
+ } else {
+ assertEquals(VALID_ASGN, msg.getAssignments());
+ }
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
new file mode 100644
index 00000000..ce9adb9f
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -0,0 +1,470 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+class ActiveStateTest extends SupportBasicStateTester {
+
+ private ActiveState state;
+
+ /**
+ * Setup.
+ */
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new ActiveState(mgr);
+ }
+
+ @Test
+ void testStart() {
+ state.start();
+
+ // ensure the timers were created
+ verify(mgr, atLeast(1)).scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class));
+
+ // ensure a heart beat was generated
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+ assertEquals(MY_HOST, msg.getRight().getSource());
+ }
+
+ @Test
+ void testProcessHeartbeat_NullHost() {
+ assertNull(state.process(new Heartbeat()));
+
+ assertFalse(state.isMyHeartbeatSeen());
+ assertFalse(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ void testProcessHeartbeat_MyHost() {
+ assertNull(state.process(new Heartbeat(MY_HOST, 0L)));
+
+ assertTrue(state.isMyHeartbeatSeen());
+ assertFalse(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ void testProcessHeartbeat_Predecessor() {
+ assertNull(state.process(new Heartbeat(HOST2, 0L)));
+
+ assertFalse(state.isMyHeartbeatSeen());
+ assertTrue(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ void testProcessHeartbeat_OtherHost() {
+ assertNull(state.process(new Heartbeat(HOST3, 0L)));
+
+ assertFalse(state.isMyHeartbeatSeen());
+ assertFalse(state.isPredHeartbeatSeen());
+
+ verify(mgr, never()).goInactive();
+ verify(mgr, never()).goQuery();
+ }
+
+ @Test
+ void testProcessOffline_NullHost() {
+ // should be ignored
+ assertNull(state.process(new Offline()));
+ }
+
+ @Test
+ void testProcessOffline_UnassignedHost() {
+ // HOST4 is not in the assignment list - should be ignored
+ assertNull(state.process(new Offline(HOST4)));
+ }
+
+ @Test
+ void testProcessOffline_IAmLeader() {
+ // configure the next state
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // one of the assigned hosts went offline
+ assertEquals(next, state.process(new Offline(HOST1)));
+
+ // should have sent a new Leader message
+ Leader msg = captureAdminMessage(Leader.class);
+
+ assertEquals(MY_HOST, msg.getSource());
+
+ // check new bucket assignments
+ assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST2), Arrays.asList(msg.getAssignments().getHostArray()));
+ }
+
+ @Test
+ void testProcessOffline_PredecessorIsLeaderNowOffline() {
+ // configure the next state
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // I am not the leader, but my predecessor was
+ mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
+ state = new ActiveState(mgr);
+
+ // my predecessor went offline
+ assertEquals(next, state.process(new Offline(PREV_HOST)));
+
+ // should have sent a new Leader message
+ Leader msg = captureAdminMessage(Leader.class);
+
+ assertEquals(MY_HOST, msg.getSource());
+
+ // check new bucket assignments
+ assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST1), Arrays.asList(msg.getAssignments().getHostArray()));
+ }
+
+ @Test
+ void testProcessOffline__PredecessorIsNotLeaderNowOffline() {
+ // I am not the leader, and neither is my predecessor
+ mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, PREV_HOST2}));
+ state = new ActiveState(mgr);
+
+ /*
+ *
+ * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader thus
+ * should be ignored.
+ */
+ assertNull(state.process(new Offline(PREV_HOST2)));
+ }
+
+ @Test
+ void testProcessOffline_OtherAssignedHostOffline() {
+ // I am not the leader
+ mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
+ state = new ActiveState(mgr);
+
+ /*
+ * HOST1 has buckets, but it isn't the leader and it isn't my predecessor, thus
+ * should be ignored.
+ */
+ assertNull(state.process(new Offline(HOST1)));
+ }
+
+ @Test
+ void testProcessLeader_Invalid() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ void testProcessLeader_BadLeader() {
+ String[] arr = {HOST2, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ // now send a Leader message for that leader
+ Leader msg = new Leader(HOST1, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ // should go Query, but not start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr, never()).startDistributing(asgn);
+ }
+
+ @Test
+ void testProcessLeader_GoodLeader() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ // now send a Leader message for that leader
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ void testActiveState() {
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+
+ // verify that it determined its neighbors
+ assertEquals(HOST1, state.getSuccHost());
+ assertEquals(HOST2, state.getPredHost());
+ }
+
+ @Test
+ void testDetmNeighbors() {
+ // if only one host (i.e., itself)
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
+ state = new ActiveState(mgr);
+ assertNull(state.getSuccHost());
+ assertEquals("", state.getPredHost());
+
+ // two hosts
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, HOST2}));
+ state = new ActiveState(mgr);
+ assertEquals(HOST2, state.getSuccHost());
+ assertEquals(HOST2, state.getPredHost());
+
+ // three hosts
+ mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2}));
+ state = new ActiveState(mgr);
+ assertEquals(HOST2, state.getSuccHost());
+ assertEquals(HOST3, state.getPredHost());
+
+ // more hosts
+ mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2, HOST4}));
+ state = new ActiveState(mgr);
+ assertEquals(HOST2, state.getSuccHost());
+ assertEquals(HOST4, state.getPredHost());
+ }
+
+ @Test
+ void testAddTimers_WithPredecessor() {
+ // invoke start() to add the timers
+ state.start();
+
+ assertEquals(3, repeatedSchedules.size());
+
+ Triple<Long, Long, StateTimerTask> timer;
+
+ // heart beat generator
+ timer = repeatedTasks.remove();
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.getLeft().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.getMiddle().longValue());
+
+ // my heart beat checker
+ timer = repeatedTasks.remove();
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getLeft().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getMiddle().longValue());
+
+ // predecessor's heart beat checker
+ timer = repeatedTasks.remove();
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getLeft().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getMiddle().longValue());
+ }
+
+ @Test
+ void testAddTimers_SansPredecessor() {
+ // only one host, thus no predecessor
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
+ state = new ActiveState(mgr);
+
+ // invoke start() to add the timers
+ state.start();
+
+ assertEquals(2, repeatedSchedules.size());
+
+ Triple<Long, Long, StateTimerTask> timer;
+
+ // heart beat generator
+ timer = repeatedTasks.remove();
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.getLeft().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.getMiddle().longValue());
+
+ // my heart beat checker
+ timer = repeatedTasks.remove();
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getLeft().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getMiddle().longValue());
+ }
+
+ @Test
+ void testAddTimers_HeartbeatGenerator() {
+ // only one host so we only have to look at one heart beat at a time
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
+ state = new ActiveState(mgr);
+
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.remove();
+
+ verify(mgr).publish(anyString(), any(Heartbeat.class));
+
+ // fire the task
+ assertNull(task.getRight().fire());
+
+ // should have generated a second pair of heart beats
+ verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class));
+
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+ assertEquals(MY_HOST, msg.getLeft());
+ assertEquals(MY_HOST, msg.getRight().getSource());
+ }
+
+ @Test
+ void testAddTimers_MyHeartbeatSeen() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
+
+ // indicate that this host is still alive
+ state.process(new Heartbeat(MY_HOST, 0L));
+
+ // set up next state
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // fire the task - should not transition
+ assertNull(task.getRight().fire());
+
+ verify(mgr, never()).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ void testAddTimers_MyHeartbeatMissed() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
+
+ // set up next state
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ // fire the task - should transition
+ assertEquals(next, task.getRight().fire());
+
+ // should continue to distribute
+ verify(mgr, never()).startDistributing(null);
+
+ // should publish an offline message
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ void testAddTimers_PredecessorHeartbeatSeen() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
+
+ // indicate that the predecessor is still alive
+ state.process(new Heartbeat(HOST2, 0L));
+
+ // set up next state, just in case
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ // fire the task - should NOT transition
+ assertNull(task.getRight().fire());
+
+ verify(mgr, never()).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ void testAddTimers_PredecessorHeartbeatMissed() {
+ // invoke start() to add the timers
+ state.start();
+
+ Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
+
+ // set up next state
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ // fire the task - should transition
+ assertEquals(next, task.getRight().fire());
+
+ verify(mgr).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ void testGenHeartbeat_OneHost() {
+ // only one host (i.e., itself)
+ mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
+ state = new ActiveState(mgr);
+
+ state.start();
+
+ verify(mgr, times(1)).publish(any(), any());
+
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+ assertEquals(MY_HOST, msg.getLeft());
+ assertEquals(MY_HOST, msg.getRight().getSource());
+ }
+
+ @Test
+ void testGenHeartbeat_MultipleHosts() {
+ state.start();
+
+ verify(mgr, times(2)).publish(any(), any());
+
+ Pair<String, Heartbeat> msg;
+ int index = 0;
+
+ // this message should go to itself
+ msg = capturePublishedMessage(Heartbeat.class, index++);
+ assertEquals(MY_HOST, msg.getLeft());
+ assertEquals(MY_HOST, msg.getRight().getSource());
+
+ // this message should go to its successor
+ msg = capturePublishedMessage(Heartbeat.class, index++);
+ assertEquals(HOST1, msg.getLeft());
+ assertEquals(MY_HOST, msg.getRight().getSource());
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
new file mode 100644
index 00000000..51e27738
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+class IdleStateTest extends SupportBasicStateTester {
+
+ private IdleState state;
+
+ /**
+ * Setup.
+ */
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new IdleState(mgr);
+ }
+
+ @Test
+ void testProcessHeartbeat() {
+ assertNull(state.process(new Heartbeat(PREV_HOST, 0L)));
+ verifyNothingPublished();
+ }
+
+ @Test
+ void testProcessIdentification() {
+ assertNull(state.process(new Identification(PREV_HOST, null)));
+ verifyNothingPublished();
+ }
+
+ @Test
+ void testProcessLeader() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, PREV_HOST, MY_HOST});
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should stay in current state, but start distributing
+ assertNull(state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ void testProcessOffline() {
+ assertNull(state.process(new Offline(PREV_HOST)));
+ verifyNothingPublished();
+ }
+
+ @Test
+ void testProcessQuery() {
+ assertNull(state.process(new Query()));
+ verifyNothingPublished();
+ }
+
+ /**
+ * Verifies that nothing was published on either channel.
+ */
+ private void verifyNothingPublished() {
+ verify(mgr, never()).publish(any(), any());
+ verify(mgr, never()).publishAdmin(any());
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
new file mode 100644
index 00000000..a5ee2d06
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
@@ -0,0 +1,121 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Query;
+
+class InactiveStateTest extends SupportBasicStateTester {
+
+ private InactiveState state;
+
+ /**
+ * Setup.
+ *
+ */
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new InactiveState(mgr);
+ }
+
+ @Test
+ void testProcessLeader() {
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ String[] arr = {PREV_HOST, MY_HOST, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ void testProcessLeader_Invalid() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ void testProcessQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(new Query()));
+
+ Identification ident = captureAdminMessage(Identification.class);
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
+ void testGoInatcive() {
+ assertNull(state.goInactive());
+ }
+
+ @Test
+ void testStart() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_REACTIVATE_WAIT_MS, timer.getLeft().longValue());
+
+ // invoke the task - it should go to the state returned by the mgr
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ assertEquals(next, timer.getRight().fire());
+ }
+
+ @Test
+ void testInactiveState() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
new file mode 100644
index 00000000..dbac7619
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
@@ -0,0 +1,396 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Query;
+import org.onap.policy.drools.pooling.state.ProcessingState.HostBucket;
+
+class ProcessingStateTest extends SupportBasicStateTester {
+
+ private ProcessingState state;
+ private HostBucket hostBucket;
+
+ /**
+ * Setup.
+ */
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new ProcessingState(mgr, MY_HOST);
+ hostBucket = new HostBucket(MY_HOST);
+ }
+
+ @Test
+ void testProcessQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(new Query()));
+
+ Identification ident = captureAdminMessage(Identification.class);
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
+ void testProcessingState() {
+ /*
+ * Null assignments should be OK.
+ */
+ when(mgr.getAssignments()).thenReturn(null);
+ state = new ProcessingState(mgr, LEADER);
+
+ /*
+ * Empty assignments should be OK.
+ */
+ when(mgr.getAssignments()).thenReturn(EMPTY_ASGN);
+ state = new ProcessingState(mgr, LEADER);
+ assertEquals(MY_HOST, state.getHost());
+ assertEquals(LEADER, state.getLeader());
+ assertEquals(EMPTY_ASGN, state.getAssignments());
+
+ /*
+ * Now try something with assignments.
+ */
+ when(mgr.getAssignments()).thenReturn(ASGN3);
+ state = new ProcessingState(mgr, LEADER);
+
+ /*
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+
+ assertEquals(LEADER, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ void testProcessingState_NullLeader() {
+ when(mgr.getAssignments()).thenReturn(EMPTY_ASGN);
+ assertThrows(NullPointerException.class, () -> state = new ProcessingState(mgr, null));
+ }
+
+ @Test
+ void testProcessingState_ZeroLengthHostArray() {
+ when(mgr.getAssignments()).thenReturn(new BucketAssignments(new String[] {}));
+ assertThrows(IllegalArgumentException.class, () -> state = new ProcessingState(mgr, LEADER));
+ }
+
+ @Test
+ void testGetAssignments() {
+ // assignments from constructor
+ assertEquals(ASGN3, state.getAssignments());
+
+ // null assignments - no effect
+ state.setAssignments(null);
+ assertEquals(ASGN3, state.getAssignments());
+
+ // empty assignments
+ state.setAssignments(EMPTY_ASGN);
+ assertEquals(EMPTY_ASGN, state.getAssignments());
+
+ // non-empty assignments
+ state.setAssignments(ASGN3);
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ void testSetAssignments() {
+ state.setAssignments(null);
+ verify(mgr, never()).startDistributing(any());
+
+ state.setAssignments(ASGN3);
+ verify(mgr).startDistributing(ASGN3);
+ }
+
+ @Test
+ void testGetLeader() {
+ // check value from constructor
+ assertEquals(MY_HOST, state.getLeader());
+
+ state.setLeader(HOST2);
+ assertEquals(HOST2, state.getLeader());
+
+ state.setLeader(HOST3);
+ assertEquals(HOST3, state.getLeader());
+ }
+
+ @Test
+ void testSetLeader() {
+ state.setLeader(MY_HOST);
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ void testSetLeader_Null() {
+ assertThrows(NullPointerException.class, () -> state.setLeader(null));
+ }
+
+ @Test
+ void testIsLeader() {
+ state.setLeader(MY_HOST);
+ assertTrue(state.isLeader());
+
+ state.setLeader(HOST2);
+ assertFalse(state.isLeader());
+ }
+
+ @Test
+ void testBecomeLeader() {
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ assertEquals(next, state.becomeLeader(sortHosts(MY_HOST, HOST2)));
+
+ Leader msg = captureAdminMessage(Leader.class);
+
+ verify(mgr).startDistributing(msg.getAssignments());
+ verify(mgr).goActive();
+ }
+
+ @Test
+ void testBecomeLeader_NotFirstAlive() {
+ // alive list contains something before my host name
+ var sortedHosts = sortHosts(PREV_HOST, MY_HOST);
+ assertThrows(IllegalArgumentException.class, () -> state.becomeLeader(sortedHosts));
+ }
+
+ @Test
+ void testMakeLeader() throws Exception {
+ state.becomeLeader(sortHosts(MY_HOST, HOST2));
+
+ Leader msg = captureAdminMessage(Leader.class);
+
+ // need a channel before invoking checkValidity()
+ msg.setChannel(Message.ADMIN);
+
+ msg.checkValidity();
+
+ assertEquals(MY_HOST, msg.getSource());
+ assertNotNull(msg.getAssignments());
+ assertTrue(msg.getAssignments().hasAssignment(MY_HOST));
+ assertTrue(msg.getAssignments().hasAssignment(HOST2));
+
+ // this one wasn't in the list of hosts, so it should have been removed
+ assertFalse(msg.getAssignments().hasAssignment(HOST1));
+ }
+
+ @Test
+ void testMakeAssignments() throws Exception {
+ state.becomeLeader(sortHosts(MY_HOST, HOST2));
+
+ captureAssignments().checkValidity();
+ }
+
+ @Test
+ void testMakeBucketArray_NullAssignments() {
+ when(mgr.getAssignments()).thenReturn(null);
+ state = new ProcessingState(mgr, MY_HOST);
+ state.becomeLeader(sortHosts(MY_HOST));
+
+ String[] arr = captureHostArray();
+
+ assertEquals(BucketAssignments.MAX_BUCKETS, arr.length);
+
+ assertTrue(Arrays.stream(arr).allMatch(MY_HOST::equals));
+ }
+
+ @Test
+ void testMakeBucketArray_ZeroAssignments() {
+ // bucket assignment with a zero-length array
+ state.setAssignments(new BucketAssignments(new String[0]));
+
+ state.becomeLeader(sortHosts(MY_HOST));
+
+ String[] arr = captureHostArray();
+
+ assertEquals(BucketAssignments.MAX_BUCKETS, arr.length);
+
+ assertTrue(Arrays.stream(arr).allMatch(MY_HOST::equals));
+ }
+
+ @Test
+ void testMakeBucketArray() {
+ /*
+ * All hosts are still alive, so it should have the exact same assignments as it
+ * had to start.
+ */
+ state.setAssignments(ASGN3);
+ state.becomeLeader(sortHosts(HOST_ARR3));
+
+ String[] arr = captureHostArray();
+
+ assertNotSame(HOST_ARR3, arr);
+ assertEquals(Arrays.asList(HOST_ARR3), Arrays.asList(arr));
+ }
+
+ @Test
+ void testRemoveExcessHosts() {
+ /*
+ * All hosts are still alive, plus some others.
+ */
+ state.setAssignments(ASGN3);
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3, HOST4));
+
+ // assignments should be unchanged
+ assertEquals(Arrays.asList(HOST_ARR3), captureHostList());
+ }
+
+ @Test
+ void testAddIndicesToHostBuckets() {
+ // some are null, some hosts are no longer alive
+ String[] asgn = {null, MY_HOST, HOST3, null, HOST4, HOST1, HOST2};
+
+ state.setAssignments(new BucketAssignments(asgn));
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2));
+
+ // every bucket should be assigned to one of the three hosts
+ String[] expected = {MY_HOST, MY_HOST, HOST1, HOST2, MY_HOST, HOST1, HOST2};
+ assertEquals(Arrays.asList(expected), captureHostList());
+ }
+
+ @Test
+ void testAssignNullBuckets() {
+ /*
+ * Ensure buckets are assigned to the host with the fewest buckets.
+ */
+ String[] asgn = {MY_HOST, HOST1, MY_HOST, null, null, null, null, null, MY_HOST};
+
+ state.setAssignments(new BucketAssignments(asgn));
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2));
+
+ String[] expected = {MY_HOST, HOST1, MY_HOST, HOST2, HOST1, HOST2, HOST1, HOST2, MY_HOST};
+ assertEquals(Arrays.asList(expected), captureHostList());
+ }
+
+ @Test
+ void testRebalanceBuckets() {
+ /*
+ * Some are very lopsided.
+ */
+ String[] asgn = {MY_HOST, HOST1, MY_HOST, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3};
+
+ state.setAssignments(new BucketAssignments(asgn));
+ state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3));
+
+ String[] expected = {HOST2, HOST1, HOST3, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3};
+ assertEquals(Arrays.asList(expected), captureHostList());
+ }
+
+ @Test
+ void testHostBucketRemove_testHostBucketAdd_testHostBucketSize() {
+ assertEquals(0, hostBucket.size());
+
+ hostBucket.add(20);
+ hostBucket.add(30);
+ hostBucket.add(40);
+ assertEquals(3, hostBucket.size());
+
+ assertEquals(20, hostBucket.remove().intValue());
+ assertEquals(30, hostBucket.remove().intValue());
+ assertEquals(1, hostBucket.size());
+
+ // add more before taking the last item
+ hostBucket.add(50);
+ hostBucket.add(60);
+ assertEquals(3, hostBucket.size());
+
+ assertEquals(40, hostBucket.remove().intValue());
+ assertEquals(50, hostBucket.remove().intValue());
+ assertEquals(60, hostBucket.remove().intValue());
+ assertEquals(0, hostBucket.size());
+
+ // add more AFTER taking the last item
+ hostBucket.add(70);
+ assertEquals(70, hostBucket.remove().intValue());
+ assertEquals(0, hostBucket.size());
+ }
+
+ @Test
+ void testHostBucketCompareTo() {
+ HostBucket hb1 = new HostBucket(PREV_HOST);
+ HostBucket hb2 = new HostBucket(MY_HOST);
+
+ assertEquals(0, hb1.compareTo(hb1));
+ assertEquals(0, hb1.compareTo(new HostBucket(PREV_HOST)));
+
+ // both empty
+ assertTrue(hb1.compareTo(hb2) < 0);
+ assertTrue(hb2.compareTo(hb1) > 0);
+
+ // hb1 has one bucket, so it should not be larger
+ hb1.add(100);
+ assertTrue(hb1.compareTo(hb2) > 0);
+ assertTrue(hb2.compareTo(hb1) < 0);
+
+ // hb1 has two buckets, so it should still be larger
+ hb1.add(200);
+ assertTrue(hb1.compareTo(hb2) > 0);
+ assertTrue(hb2.compareTo(hb1) < 0);
+
+ // hb1 has two buckets, hb2 has one, so hb1 should still be larger
+ hb2.add(1000);
+ assertTrue(hb1.compareTo(hb2) > 0);
+ assertTrue(hb2.compareTo(hb1) < 0);
+
+ // same number of buckets, so hb2 should now be larger
+ hb2.add(2000);
+ assertTrue(hb1.compareTo(hb2) < 0);
+ assertTrue(hb2.compareTo(hb1) > 0);
+
+ // hb2 has more buckets, it should still be larger
+ hb2.add(3000);
+ assertTrue(hb1.compareTo(hb2) < 0);
+ assertTrue(hb2.compareTo(hb1) > 0);
+ }
+
+ @Test
+ void testHostBucketHashCode() {
+ assertThrows(UnsupportedOperationException.class, () -> hostBucket.hashCode());
+ }
+
+ @Test
+ void testHostBucketEquals() {
+ assertThrows(UnsupportedOperationException.class, () -> hostBucket.equals(hostBucket));
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
new file mode 100644
index 00000000..aae6e056
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -0,0 +1,444 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+
+class QueryStateTest extends SupportBasicStateTester {
+
+ private QueryState state;
+
+ /**
+ * Setup.
+ */
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new QueryState(mgr);
+ }
+
+ @Test
+ void testGoQuery() {
+ assertNull(state.goQuery());
+ }
+
+ @Test
+ void testStart() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
+ }
+
+ @Test
+ void testProcessIdentification_SameSource() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertNull(state.process(new Identification(MY_HOST, asgn)));
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ verify(mgr, never()).startDistributing(asgn);
+ }
+
+ @Test
+ void testProcessIdentification_DiffSource() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertNull(state.process(new Identification(HOST2, asgn)));
+
+ // leader should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+
+ // should have picked up the assignments
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ void testProcessLeader_Invalid() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ void testProcessLeader_SameLeader() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ // identify a leader that's better than my host
+ assertNull(state.process(new Identification(PREV_HOST, asgn)));
+
+ // now send a Leader message for that leader
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr, never()).goInactive();
+
+ // Ident msg + Leader msg = times(2)
+ verify(mgr, times(2)).startDistributing(asgn);
+ }
+
+ @Test
+ void testProcessLeader_BetterLeaderWithAssignment() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ void testProcessLeader_BetterLeaderWithoutAssignment() {
+ String[] arr = {HOST2, PREV_HOST, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // should go Inactive, but start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ verify(mgr, never()).goActive();
+ }
+
+ @Test
+ void testProcessLeader_NotABetterLeader() {
+ // no assignments yet
+ mgr.startDistributing(null);
+ state = new QueryState(mgr);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ Leader msg = new Leader(HOST1, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ // should stay in the same state
+ assertNull(state.process(msg));
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // should have started distributing
+ verify(mgr).startDistributing(asgn);
+
+ // this host should still be the leader
+ assertEquals(MY_HOST, state.getLeader());
+
+ // new assignments
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ void testProcessOffline_NullHost() {
+ assertNull(state.process(new Offline()));
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ void testProcessOffline_SameHost() {
+ assertNull(state.process(new Offline(MY_HOST)));
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ void testProcessOffline_DiffHost() {
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, HOST1});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ // tell it that the hosts are alive
+ state.process(new Identification(PREV_HOST, asgn));
+ state.process(new Identification(HOST1, asgn));
+
+ // #2 goes offline
+ assertNull(state.process(new Offline(HOST1)));
+
+ // #1 should still be the leader
+ assertEquals(PREV_HOST, state.getLeader());
+
+ // #1 goes offline
+ assertNull(state.process(new Offline(PREV_HOST)));
+
+ // this should still be the leader now
+ assertEquals(MY_HOST, state.getLeader());
+ }
+
+ @Test
+ void testQueryState() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ void testAwaitIdentification_MissingSelfIdent() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
+
+ // should published an Offline message and go inactive
+
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ assertEquals(next, timer.getRight().fire());
+
+ // should continue distributing
+ verify(mgr, never()).startDistributing(null);
+
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ void testAwaitIdentification_Leader() {
+ state.start();
+ state.process(new Identification(MY_HOST, null));
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ assertEquals(next, timer.getRight().fire());
+
+ // should have published a Leader message
+ Leader msg = captureAdminMessage(Leader.class);
+ assertEquals(MY_HOST, msg.getSource());
+ assertTrue(msg.getAssignments().hasAssignment(MY_HOST));
+ }
+
+ @Test
+ void testAwaitIdentification_HasAssignment() {
+ // not the leader, but has an assignment
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ state.start();
+ state.process(new Identification(MY_HOST, null));
+
+ // tell it the leader is still active
+ state.process(new Identification(PREV_HOST, asgn));
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
+
+ // set up active state, as that's what it should return
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
+
+ assertEquals(next, timer.getRight().fire());
+
+ // should NOT have published a Leader message
+ assertTrue(admin.isEmpty());
+
+ // should have gone active with the current assignments
+ verify(mgr).goActive();
+ }
+
+ @Test
+ void testAwaitIdentification_NoAssignment() {
+ // not the leader and no assignment
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ mgr.startDistributing(asgn);
+ state = new QueryState(mgr);
+
+ state.start();
+ state.process(new Identification(MY_HOST, null));
+
+ // tell it the leader is still active
+ state.process(new Identification(PREV_HOST, asgn));
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
+
+ // set up inactive state, as that's what it should return
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ assertEquals(next, timer.getRight().fire());
+
+ // should NOT have published a Leader message
+ assertTrue(admin.isEmpty());
+ }
+
+ @Test
+ void testRecordInfo_NullSource() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ state.process(new Identification(null, asgn));
+
+ // leader unchanged
+ assertEquals(MY_HOST, state.getLeader());
+
+ // assignments still updated
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ void testRecordInfo_SourcePreceedsMyHost() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ state.process(new Identification(PREV_HOST, asgn));
+
+ // new leader
+ assertEquals(PREV_HOST, state.getLeader());
+
+ // assignments still updated
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ void testRecordInfo_SourceFollowsMyHost() {
+ mgr.startDistributing(null);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ state.process(new Identification(HOST1, asgn));
+
+ // leader unchanged
+ assertEquals(MY_HOST, state.getLeader());
+
+ // assignments still updated
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ void testRecordInfo_NewIsNull() {
+ state.setAssignments(ASGN3);
+ state.process(new Identification(HOST1, null));
+
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ void testRecordInfo_NewIsEmpty() {
+ state.setAssignments(ASGN3);
+ state.process(new Identification(PREV_HOST, new BucketAssignments()));
+
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ void testRecordInfo_OldIsNull() {
+ mgr.startDistributing(null);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ state.process(new Identification(HOST1, asgn));
+
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ void testRecordInfo_OldIsEmpty() {
+ state.setAssignments(new BucketAssignments());
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
+ state.process(new Identification(HOST1, asgn));
+
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ void testRecordInfo_NewLeaderPreceedsOld() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
+ state.process(new Identification(HOST3, asgn));
+
+ assertEquals(asgn, state.getAssignments());
+ }
+
+ @Test
+ void testRecordInfo_NewLeaderSucceedsOld() {
+ state.setAssignments(ASGN3);
+ state.setLeader(MY_HOST);
+
+ BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, HOST3});
+ state.process(new Identification(HOST3, asgn));
+
+ // should be unchanged
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
new file mode 100644
index 00000000..1a85304f
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -0,0 +1,184 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+class StartStateTest extends SupportBasicStateTester {
+
+ private StartState state;
+
+ /**
+ * Setup.
+ */
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new StartState(mgr);
+ }
+
+ @Test
+ void testStart() {
+ state.start();
+
+ Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
+
+ assertEquals(MY_HOST, msg.getLeft());
+ assertEquals(state.getHbTimestampMs(), msg.getRight().getTimestampMs());
+
+
+ /*
+ * Verify heartbeat generator
+ */
+ Triple<Long, Long, StateTimerTask> generator = repeatedTasks.removeFirst();
+
+ assertEquals(STD_INTER_HEARTBEAT_MS, generator.getLeft().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, generator.getMiddle().longValue());
+
+ // invoke the task - it should generate another heartbeat
+ assertNull(generator.getRight().fire());
+ verify(mgr, times(2)).publish(MY_HOST, msg.getRight());
+
+ // and again
+ assertNull(generator.getRight().fire());
+ verify(mgr, times(3)).publish(MY_HOST, msg.getRight());
+
+
+ /*
+ * Verify heartbeat checker
+ */
+ Pair<Long, StateTimerTask> checker = onceTasks.removeFirst();
+
+ assertEquals(STD_HEARTBEAT_WAIT_MS, checker.getLeft().longValue());
+
+ // invoke the task - it should go to the state returned by the mgr
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ assertEquals(next, checker.getRight().fire());
+
+ verify(mgr).startDistributing(null);
+ }
+
+ @Test
+ void testStartStatePoolingManager() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ void testStartStateState() {
+ // create a new state from the current state
+ state = new StartState(mgr);
+
+ /*
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ void testProcessHeartbeat() {
+ Heartbeat msg = new Heartbeat();
+
+ // no matching data in heart beat
+ assertNull(state.process(msg));
+ verify(mgr, never()).publishAdmin(any());
+
+ // same source, different time stamp
+ msg.setSource(MY_HOST);
+ msg.setTimestampMs(state.getHbTimestampMs() - 1);
+ assertNull(state.process(msg));
+ verify(mgr, never()).publishAdmin(any());
+
+ // same time stamp, different source
+ msg.setSource("unknown");
+ msg.setTimestampMs(state.getHbTimestampMs());
+ assertNull(state.process(msg));
+ verify(mgr, never()).publishAdmin(any());
+
+ // matching heart beat
+ msg.setSource(MY_HOST);
+ msg.setTimestampMs(state.getHbTimestampMs());
+
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ assertEquals(next, state.process(msg));
+
+ verify(mgr).publishAdmin(any(Query.class));
+ }
+
+ @Test
+ void testProcessIdentification() {
+ assertNull(state.process(new Identification(MY_HOST, null)));
+ }
+
+ @Test
+ void testProcessLeader() {
+ assertNull(state.process(new Leader(MY_HOST, null)));
+ }
+
+ @Test
+ void testProcessOffline() {
+ assertNull(state.process(new Offline(HOST1)));
+ }
+
+ @Test
+ void testProcessQuery() {
+ assertNull(state.process(new Query()));
+ }
+
+ @Test
+ void testGetHbTimestampMs() {
+ long tcurrent = System.currentTimeMillis();
+ assertTrue(new StartState(mgr).getHbTimestampMs() >= tcurrent);
+
+ tcurrent = System.currentTimeMillis();
+ assertTrue(new StartState(mgr).getHbTimestampMs() >= tcurrent);
+ }
+
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
new file mode 100644
index 00000000..cfae6f3c
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -0,0 +1,466 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.drools.pooling.CancellableScheduledTask;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+class StateTest extends SupportBasicStateTester {
+
+ private State state;
+
+ /**
+ * Setup.
+ */
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+
+ state = new MyState(mgr);
+ }
+
+ @Test
+ void testStatePoolingManager() {
+ /*
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ void testStateState() {
+ // allocate a new state, copying from the old state
+ state = new MyState(mgr);
+
+ /*
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
+ */
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ void testCancelTimers() {
+ int delay = 100;
+ int initDelay = 200;
+
+ /*
+ * Create three tasks tasks.
+ */
+
+ StateTimerTask task1 = mock(StateTimerTask.class);
+ StateTimerTask task2 = mock(StateTimerTask.class);
+ StateTimerTask task3 = mock(StateTimerTask.class);
+
+ // two tasks via schedule()
+ state.schedule(delay, task1);
+ state.schedule(delay, task2);
+
+ // one task via scheduleWithFixedDelay()
+ state.scheduleWithFixedDelay(initDelay, delay, task3);
+
+ // ensure all were scheduled, but not yet canceled
+ verify(mgr).schedule(delay, task1);
+ verify(mgr).schedule(delay, task2);
+ verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3);
+
+ CancellableScheduledTask sched1 = onceSchedules.removeFirst();
+ CancellableScheduledTask sched2 = onceSchedules.removeFirst();
+ CancellableScheduledTask sched3 = repeatedSchedules.removeFirst();
+
+ verify(sched1, never()).cancel();
+ verify(sched2, never()).cancel();
+ verify(sched3, never()).cancel();
+
+ /*
+ * Cancel the timers.
+ */
+ state.cancelTimers();
+
+ // verify that all were cancelled
+ verify(sched1).cancel();
+ verify(sched2).cancel();
+ verify(sched3).cancel();
+ }
+
+ @Test
+ void testStart() {
+ assertThatCode(() -> state.start()).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testGoStart() {
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ State next2 = state.goStart();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ void testGoQuery() {
+ State next = mock(State.class);
+ when(mgr.goQuery()).thenReturn(next);
+
+ State next2 = state.goQuery();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ void testGoActive_WithAssignment() {
+ State act = mock(State.class);
+ State inact = mock(State.class);
+
+ when(mgr.goActive()).thenReturn(act);
+ when(mgr.goInactive()).thenReturn(inact);
+
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertEquals(act, state.goActive(asgn));
+
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ void testGoActive_WithoutAssignment() {
+ State act = mock(State.class);
+ State inact = mock(State.class);
+
+ when(mgr.goActive()).thenReturn(act);
+ when(mgr.goInactive()).thenReturn(inact);
+
+ String[] arr = {HOST2, PREV_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertEquals(inact, state.goActive(asgn));
+
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ void testGoActive_NullAssignment() {
+ State act = mock(State.class);
+ State inact = mock(State.class);
+
+ when(mgr.goActive()).thenReturn(act);
+ when(mgr.goInactive()).thenReturn(inact);
+
+ assertEquals(inact, state.goActive(null));
+
+ verify(mgr, never()).startDistributing(any());
+ }
+
+ @Test
+ void testGoInactive() {
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ State next2 = state.goInactive();
+ assertEquals(next, next2);
+ }
+
+ @Test
+ void testProcessHeartbeat() {
+ assertNull(state.process(new Heartbeat()));
+ }
+
+ @Test
+ void testProcessIdentification() {
+ assertNull(state.process(new Identification()));
+ }
+
+ @Test
+ void testProcessLeader() {
+ String[] arr = {HOST2, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(HOST1, asgn);
+
+ // should ignore it
+ assertNull(state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ void testProcessLeader_Invalid() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+ }
+
+ @Test
+ void testIsValidLeader_NullAssignment() {
+ assertFalse(state.isValid(new Leader(PREV_HOST, null)));
+ }
+
+ @Test
+ void testIsValidLeader_NullSource() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ assertFalse(state.isValid(new Leader(null, asgn)));
+ }
+
+ @Test
+ void testIsValidLeader_EmptyAssignment() {
+ assertFalse(state.isValid(new Leader(PREV_HOST, new BucketAssignments())));
+ }
+
+ @Test
+ void testIsValidLeader_FromSelf() {
+ String[] arr = {HOST2, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertFalse(state.isValid(new Leader(MY_HOST, asgn)));
+ }
+
+ @Test
+ void testIsValidLeader_WrongLeader() {
+ String[] arr = {HOST2, HOST3};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertFalse(state.isValid(new Leader(HOST1, asgn)));
+ }
+
+ @Test
+ void testIsValidLeader() {
+ String[] arr = {HOST2, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertTrue(state.isValid(new Leader(HOST1, asgn)));
+ }
+
+ @Test
+ void testProcessOffline() {
+ assertNull(state.process(new Offline()));
+ }
+
+ @Test
+ void testProcessQuery() {
+ assertNull(state.process(new Query()));
+ }
+
+ @Test
+ void testPublishIdentification() {
+ Identification msg = new Identification();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ void testPublishLeader() {
+ Leader msg = new Leader();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ void testPublishOffline() {
+ Offline msg = new Offline();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ void testPublishQuery() {
+ Query msg = new Query();
+ state.publish(msg);
+
+ verify(mgr).publishAdmin(msg);
+ }
+
+ @Test
+ void testPublishStringHeartbeat() {
+ String chnl = "channelH";
+ Heartbeat msg = new Heartbeat();
+
+ state.publish(chnl, msg);
+
+ verify(mgr).publish(chnl, msg);
+ }
+
+ @Test
+ void testStartDistributing() {
+ BucketAssignments asgn = new BucketAssignments();
+ state.startDistributing(asgn);
+
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ void testStartDistributing_NullAssignments() {
+ state.startDistributing(null);
+
+ verify(mgr, never()).startDistributing(any());
+ }
+
+ @Test
+ void testSchedule() {
+ int delay = 100;
+
+ StateTimerTask task = mock(StateTimerTask.class);
+
+ state.schedule(delay, task);
+
+ CancellableScheduledTask sched = onceSchedules.removeFirst();
+
+ // scheduled, but not canceled yet
+ verify(mgr).schedule(delay, task);
+ verify(sched, never()).cancel();
+
+ /*
+ * Ensure the state added the timer to its list by telling it to cancel its timers
+ * and then seeing if this timer was canceled.
+ */
+ state.cancelTimers();
+ verify(sched).cancel();
+ }
+
+ @Test
+ void testScheduleWithFixedDelay() {
+ int initdel = 100;
+ int delay = 200;
+
+ StateTimerTask task = mock(StateTimerTask.class);
+
+ state.scheduleWithFixedDelay(initdel, delay, task);
+
+ CancellableScheduledTask sched = repeatedSchedules.removeFirst();
+
+ // scheduled, but not canceled yet
+ verify(mgr).scheduleWithFixedDelay(initdel, delay, task);
+ verify(sched, never()).cancel();
+
+ /*
+ * Ensure the state added the timer to its list by telling it to cancel its timers
+ * and then seeing if this timer was canceled.
+ */
+ state.cancelTimers();
+ verify(sched).cancel();
+ }
+
+ @Test
+ void testMissedHeartbeat() {
+ State next = mock(State.class);
+ when(mgr.goStart()).thenReturn(next);
+
+ State next2 = state.missedHeartbeat();
+ assertEquals(next, next2);
+
+ // should continue to distribute
+ verify(mgr, never()).startDistributing(null);
+
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ void testInternalTopicFailed() {
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ State next2 = state.internalTopicFailed();
+ assertEquals(next, next2);
+
+ // should stop distributing
+ verify(mgr).startDistributing(null);
+
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ void testMakeHeartbeat() {
+ long timestamp = 30000L;
+ Heartbeat msg = state.makeHeartbeat(timestamp);
+
+ assertEquals(MY_HOST, msg.getSource());
+ assertEquals(timestamp, msg.getTimestampMs());
+ }
+
+ @Test
+ void testMakeIdentification() {
+ Identification ident = state.makeIdentification();
+ assertEquals(MY_HOST, ident.getSource());
+ assertEquals(ASGN3, ident.getAssignments());
+ }
+
+ @Test
+ void testMakeOffline() {
+ Offline msg = state.makeOffline();
+
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ void testMakeQuery() {
+ Query msg = state.makeQuery();
+
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
+ void testGetHost() {
+ assertEquals(MY_HOST, state.getHost());
+ }
+
+ @Test
+ void testGetTopic() {
+ assertEquals(MY_TOPIC, state.getTopic());
+ }
+
+ /**
+ * State used for testing purposes, with abstract methods implemented.
+ */
+ private static class MyState extends State {
+
+ public MyState(PoolingManager mgr) {
+ super(mgr);
+ }
+ }
+}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
new file mode 100644
index 00000000..8f2c9160
--- /dev/null
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
@@ -0,0 +1,282 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.onap.policy.drools.pooling.CancellableScheduledTask;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.PoolingProperties;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+
+/**
+ * Superclass used to test subclasses of {@link State}.
+ */
+public class SupportBasicStateTester {
+
+ protected static final long STD_HEARTBEAT_WAIT_MS = 10;
+ protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
+ protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
+ protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
+ protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
+
+ protected static final String MY_TOPIC = "myTopic";
+
+ protected static final String PREV_HOST = "prevHost";
+ protected static final String PREV_HOST2 = PREV_HOST + "A";
+
+ // this follows PREV_HOST, alphabetically
+ protected static final String MY_HOST = PREV_HOST + "X";
+
+ // these follow MY_HOST, alphabetically
+ protected static final String HOST1 = MY_HOST + "1";
+ protected static final String HOST2 = MY_HOST + "2";
+ protected static final String HOST3 = MY_HOST + "3";
+ protected static final String HOST4 = MY_HOST + "4";
+
+ protected static final String LEADER = HOST1;
+
+ protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2};
+
+ protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments();
+ protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
+
+ /**
+ * Scheduled tasks returned by schedule().
+ */
+ protected LinkedList<CancellableScheduledTask> onceSchedules;
+
+ /**
+ * Tasks captured via schedule().
+ */
+ protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
+
+ /**
+ * Scheduled tasks returned by scheduleWithFixedDelay().
+ */
+ protected LinkedList<CancellableScheduledTask> repeatedSchedules;
+
+ /**
+ * Tasks captured via scheduleWithFixedDelay().
+ */
+ protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks;
+
+ /**
+ * Messages captured via publish().
+ */
+ protected LinkedList<Pair<String, Message>> published;
+
+ /**
+ * Messages captured via publishAdmin().
+ */
+ protected LinkedList<Message> admin;
+
+ protected PoolingManager mgr;
+ protected PoolingProperties props;
+ protected State prevState;
+
+ public SupportBasicStateTester() {
+ super();
+ }
+
+ /**
+ * Setup.
+ *
+ * @throws Exception throws exception
+ */
+ public void setUp() throws Exception {
+ onceSchedules = new LinkedList<>();
+ onceTasks = new LinkedList<>();
+
+ repeatedSchedules = new LinkedList<>();
+ repeatedTasks = new LinkedList<>();
+
+ published = new LinkedList<>();
+ admin = new LinkedList<>();
+
+ mgr = mock(PoolingManager.class);
+ props = mock(PoolingProperties.class);
+
+ when(mgr.getHost()).thenReturn(MY_HOST);
+ when(mgr.getTopic()).thenReturn(MY_TOPIC);
+ when(mgr.getProperties()).thenReturn(props);
+
+ when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
+ when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
+ when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
+ when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
+ when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
+
+ prevState = new State(mgr) {};
+
+ // capture publish() arguments
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ published.add(Pair.of((String) args[0], (Message) args[1]));
+
+ return null;
+ }).when(mgr).publish(anyString(), any(Message.class));
+
+ // capture publishAdmin() arguments
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ admin.add((Message) args[0]);
+
+ return null;
+ }).when(mgr).publishAdmin(any(Message.class));
+
+ // capture schedule() arguments, and return a new future
+ when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ onceTasks.add(Pair.of((Long) args[0], (StateTimerTask) args[1]));
+
+ CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
+ onceSchedules.add(sched);
+ return sched;
+ });
+
+ // capture scheduleWithFixedDelay() arguments, and return a new future
+ when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ repeatedTasks.add(Triple.of((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
+
+ CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
+ repeatedSchedules.add(sched);
+ return sched;
+ });
+
+ // get/set assignments in the manager
+ AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3);
+
+ when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
+
+ doAnswer(args -> {
+ asgn.set(args.getArgument(0));
+ return null;
+ }).when(mgr).startDistributing(any());
+ }
+
+ /**
+ * Makes a sorted set of hosts.
+ *
+ * @param hosts the hosts to be sorted
+ * @return the set of hosts, sorted
+ */
+ protected SortedSet<String> sortHosts(String... hosts) {
+ return new TreeSet<>(Arrays.asList(hosts));
+ }
+
+ /**
+ * Captures the host array from the Leader message published to the admin channel.
+ *
+ * @return the host array, as a list
+ */
+ protected List<String> captureHostList() {
+ return Arrays.asList(captureHostArray());
+ }
+
+ /**
+ * Captures the host array from the Leader message published to the admin channel.
+ *
+ * @return the host array
+ */
+ protected String[] captureHostArray() {
+ BucketAssignments asgn = captureAssignments();
+
+ String[] arr = asgn.getHostArray();
+ assertNotNull(arr);
+
+ return arr;
+ }
+
+ /**
+ * Captures the assignments from the Leader message published to the admin channel.
+ *
+ * @return the bucket assignments
+ */
+ protected BucketAssignments captureAssignments() {
+ Leader msg = captureAdminMessage(Leader.class);
+
+ BucketAssignments asgn = msg.getAssignments();
+ assertNotNull(asgn);
+ return asgn;
+ }
+
+ /**
+ * Captures the message published to the admin channel.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @return the message that was published
+ */
+ protected <T extends Message> T captureAdminMessage(Class<T> clazz) {
+ return captureAdminMessage(clazz, 0);
+ }
+
+ /**
+ * Captures the message published to the admin channel.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @param index index of the item to be captured
+ * @return the message that was published
+ */
+ protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) {
+ return clazz.cast(admin.get(index));
+ }
+
+ /**
+ * Captures the message published to the non-admin channels.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @return the (channel,message) pair that was published
+ */
+ protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) {
+ return capturePublishedMessage(clazz, 0);
+ }
+
+ /**
+ * Captures the message published to the non-admin channels.
+ *
+ * @param clazz type of {@link Message} to capture
+ * @param index index of the item to be captured
+ * @return the (channel,message) pair that was published
+ */
+ protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
+ Pair<String, Message> msg = published.get(index);
+ return Pair.of(msg.getLeft(), clazz.cast(msg.getRight()));
+ }
+}