aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-messages/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-messages/src/main/java/org')
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java34
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java397
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java54
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java54
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java133
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java647
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java150
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/Serializer.java124
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java233
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java205
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java52
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Identification.java41
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Leader.java70
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Message.java79
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java68
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Offline.java41
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Query.java40
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java270
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java34
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java81
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java398
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java204
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StartState.java99
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java373
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java37
25 files changed, 3918 insertions, 0 deletions
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java
new file mode 100644
index 00000000..4e9112e6
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+/**
+ * A scheduled task that can be cancelled.
+ */
+@FunctionalInterface
+public interface CancellableScheduledTask {
+
+ /**
+ * Cancels the scheduled task.
+ */
+ void cancel();
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
new file mode 100644
index 00000000..6411dd81
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -0,0 +1,397 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.utils.properties.SpecProperties;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.features.DroolsControllerFeatureApi;
+import org.onap.policy.drools.features.PolicyControllerFeatureApi;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistenceConstants;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyControllerConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.util.FeatureEnabledChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Controller/session pooling. Multiple hosts may be launched, all servicing the same
+ * controllers/sessions. When this feature is enabled, the requests are divided across the different
+ * hosts, instead of all running on a single, active host.
+ *
+ * <p>With each controller, there is an
+ * associated DMaaP topic that is used for internal communication between the different hosts
+ * serving the controller.
+ */
+public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi {
+
+ private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
+
+ /**
+ * ID of this host.
+ */
+ @Getter
+ private final String host;
+
+ /**
+ * Entire set of feature properties, including those specific to various controllers.
+ */
+ private Properties featProps = null;
+
+ /**
+ * Maps a controller name to its associated manager.
+ */
+ private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+
+ /**
+ * Decremented each time a manager enters the Active state. Used by junit tests.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ private final CountDownLatch activeLatch = new CountDownLatch(1);
+
+ /**
+ * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
+ * called later. As multiple threads can be active within the methods at the same
+ * time, we must keep this in thread local storage.
+ */
+ private ThreadLocal<String> offerTopics = new ThreadLocal<>();
+
+ /**
+ * Constructor.
+ */
+ public PoolingFeature() {
+ super();
+
+ this.host = UUID.randomUUID().toString();
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return 0;
+ }
+
+ @Override
+ public boolean beforeStart(PolicyEngine engine) {
+ logger.info("initializing {}", PoolingProperties.FEATURE_NAME);
+ featProps = getProperties(PoolingProperties.FEATURE_NAME);
+
+ // remove any generic pooling topic - always use controller-specific property
+ featProps.remove(PoolingProperties.POOLING_TOPIC);
+
+ initTopicSources(featProps);
+ initTopicSinks(featProps);
+
+ return false;
+ }
+
+ @Override
+ public boolean beforeStart(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.beforeStart();
+ return false;
+ });
+ }
+
+ /**
+ * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
+ *
+ * @throws PoolingFeatureRtException if an error occurs
+ */
+ @Override
+ public boolean afterCreate(PolicyController controller) {
+
+ if (featProps == null) {
+ logger.error("pooling feature properties have not been loaded");
+ throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
+ }
+
+ String name = controller.getName();
+
+ var specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps);
+
+ if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) {
+ try {
+ // get & validate the properties
+ var props = new PoolingProperties(name, featProps);
+
+ logger.info("pooling enabled for {}", name);
+ ctlr2pool.computeIfAbsent(name, xxx -> makeManager(host, controller, props, activeLatch));
+
+ } catch (PropertyException e) {
+ logger.error("pooling disabled due to exception for {}", name);
+ throw new PoolingFeatureRtException(e);
+ }
+
+ } else {
+ logger.info("pooling disabled for {}", name);
+ }
+
+
+ return false;
+ }
+
+ @Override
+ public boolean afterStart(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.afterStart();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean beforeStop(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.beforeStop();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean afterStop(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.afterStop();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean afterShutdown(PolicyController controller) {
+ return commonShutdown(controller);
+ }
+
+ @Override
+ public boolean afterHalt(PolicyController controller) {
+ return commonShutdown(controller);
+ }
+
+ private boolean commonShutdown(PolicyController controller) {
+ deleteManager(controller);
+ return false;
+ }
+
+ @Override
+ public boolean beforeLock(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.beforeLock();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean afterUnlock(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.afterUnlock();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
+ /*
+ * As this is invoked a lot, we'll directly call the manager's method instead of using the
+ * functional interface via doManager().
+ */
+ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
+ if (mgr == null) {
+ return false;
+ }
+
+ if (mgr.beforeOffer(topic2, event)) {
+ return true;
+ }
+
+ offerTopics.set(topic2);
+ return false;
+ }
+
+ @Override
+ public boolean beforeInsert(DroolsController droolsController, Object fact) {
+
+ String topic = offerTopics.get();
+ if (topic == null) {
+ logger.warn("missing arguments for feature-pooling-messages in beforeInsert");
+ return false;
+ }
+
+ PolicyController controller;
+ try {
+ controller = getController(droolsController);
+
+ } catch (IllegalArgumentException | IllegalStateException e) {
+ logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
+ droolsController.getArtifactId(), e);
+ return false;
+ }
+
+
+ if (controller == null) {
+ logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
+ droolsController.getArtifactId());
+ return false;
+ }
+
+ /*
+ * As this is invoked a lot, we'll directly call the manager's method instead of using the
+ * functional interface via doManager().
+ */
+ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
+ if (mgr == null) {
+ return false;
+ }
+
+ return mgr.beforeInsert(topic, fact);
+ }
+
+ @Override
+ public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
+ boolean success) {
+
+ // clear any stored arguments
+ offerTopics.remove();
+
+ return false;
+ }
+
+ /**
+ * Executes a function using the manager associated with the controller. Catches any exceptions
+ * from the function and re-throws it as a runtime exception.
+ *
+ * @param controller controller
+ * @param func function to be executed
+ * @return {@code true} if the function handled the request, {@code false} otherwise
+ * @throws PoolingFeatureRtException if an error occurs
+ */
+ private boolean doManager(PolicyController controller, MgrFunc func) {
+ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
+ if (mgr == null) {
+ return false;
+ }
+
+ try {
+ return func.apply(mgr);
+
+ } catch (PoolingFeatureException e) {
+ throw new PoolingFeatureRtException(e);
+ }
+ }
+
+ /**
+ * Deletes the manager associated with a controller.
+ *
+ * @param controller controller
+ * @throws PoolingFeatureRtException if an error occurs
+ */
+ private void deleteManager(PolicyController controller) {
+
+ String name = controller.getName();
+ logger.info("remove feature-pooling-messages manager for {}", name);
+
+ ctlr2pool.remove(name);
+ }
+
+ /**
+ * Function that operates on a manager.
+ */
+ @FunctionalInterface
+ private static interface MgrFunc {
+
+ /**
+ * Apply.
+ *
+ * @param mgr manager
+ * @return {@code true} if the request was handled by the manager, {@code false} otherwise
+ * @throws PoolingFeatureException feature exception
+ */
+ boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
+ }
+
+ /*
+ * The remaining methods may be overridden by junit tests.
+ */
+
+ /**
+ * Get properties.
+ *
+ * @param featName feature name
+ * @return the properties for the specified feature
+ */
+ protected Properties getProperties(String featName) {
+ return SystemPersistenceConstants.getManager().getProperties(featName);
+ }
+
+ /**
+ * Makes a pooling manager for a controller.
+ *
+ * @param host name/uuid of this host
+ * @param controller controller
+ * @param props properties to use to configure the manager
+ * @param activeLatch decremented when the manager goes Active
+ * @return a new pooling manager
+ */
+ protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+ return new PoolingManagerImpl(host, controller, props, activeLatch);
+ }
+
+ /**
+ * Gets the policy controller associated with a drools controller.
+ *
+ * @param droolsController drools controller
+ * @return the policy controller associated with a drools controller
+ */
+ protected PolicyController getController(DroolsController droolsController) {
+ return PolicyControllerConstants.getFactory().get(droolsController);
+ }
+
+ /**
+ * Initializes the topic sources.
+ *
+ * @param props properties used to configure the topics
+ * @return the topic sources
+ */
+ protected List<TopicSource> initTopicSources(Properties props) {
+ return TopicEndpointManager.getManager().addTopicSources(props);
+ }
+
+ /**
+ * Initializes the topic sinks.
+ *
+ * @param props properties used to configure the topics
+ * @return the topic sinks
+ */
+ protected List<TopicSink> initTopicSinks(Properties props) {
+ return TopicEndpointManager.getManager().addTopicSinks(props);
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java
new file mode 100644
index 00000000..5d7b9f76
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.io.Serial;
+
+/**
+ * Exception thrown by the pooling feature.
+ */
+public class PoolingFeatureException extends Exception {
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ public PoolingFeatureException() {
+ super();
+ }
+
+ public PoolingFeatureException(String message) {
+ super(message);
+ }
+
+ public PoolingFeatureException(Throwable cause) {
+ super(cause);
+ }
+
+ public PoolingFeatureException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PoolingFeatureException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java
new file mode 100644
index 00000000..5d0a2755
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.io.Serial;
+
+/**
+ * A runtime exception thrown by the pooling feature.
+ */
+public class PoolingFeatureRtException extends RuntimeException {
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ public PoolingFeatureRtException() {
+ super();
+ }
+
+ public PoolingFeatureRtException(String message) {
+ super(message);
+ }
+
+ public PoolingFeatureRtException(Throwable cause) {
+ super(cause);
+ }
+
+ public PoolingFeatureRtException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PoolingFeatureRtException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
new file mode 100644
index 00000000..5e358e61
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -0,0 +1,133 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.state.State;
+import org.onap.policy.drools.pooling.state.StateTimerTask;
+
+/**
+ * Pooling manager for a single PolicyController.
+ */
+public interface PoolingManager {
+
+ /**
+ * Gets the properties used to configure the manager.
+ *
+ * @return pooling properties
+ */
+ PoolingProperties getProperties();
+
+ /**
+ * Gets the host id.
+ *
+ * @return the host id
+ */
+ String getHost();
+
+ /**
+ * Gets the name of the internal DMaaP topic used by this manager to communicate with
+ * its other hosts.
+ *
+ * @return the name of the internal DMaaP topic
+ */
+ String getTopic();
+
+ /**
+ * Starts distributing requests according to the given bucket assignments.
+ *
+ * @param assignments must <i>not</i> be {@code null}
+ */
+ void startDistributing(BucketAssignments assignments);
+
+ /**
+ * Gets the current bucket assignments.
+ *
+ * @return the current bucket assignments, or {@code null} if no assignments have been
+ * made
+ */
+ BucketAssignments getAssignments();
+
+ /**
+ * Publishes a message to the internal topic on the administrative channel.
+ *
+ * @param msg message to be published
+ */
+ void publishAdmin(Message msg);
+
+ /**
+ * Publishes a message to the internal topic on a particular channel.
+ *
+ * @param channel channel on which the message should be published
+ * @param msg message to be published
+ */
+ void publish(String channel, Message msg);
+
+ /**
+ * Schedules a timer to fire after a delay.
+ *
+ * @param delayMs delay, in milliseconds
+ * @param task task
+ * @return a new scheduled task
+ */
+ CancellableScheduledTask schedule(long delayMs, StateTimerTask task);
+
+ /**
+ * Schedules a timer to fire repeatedly.
+ *
+ * @param initialDelayMs initial delay, in milliseconds
+ * @param delayMs delay, in milliseconds
+ * @param task task
+ * @return a new scheduled task
+ */
+ CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task);
+
+ /**
+ * Transitions to the "start" state.
+ *
+ * @return the new state
+ */
+ State goStart();
+
+ /**
+ * Transitions to the "query" state.
+ *
+ * @return the new state
+ */
+ State goQuery();
+
+ /**
+ * Transitions to the "active" state.
+ *
+ * @return the new state
+ */
+ State goActive();
+
+ /**
+ * Transitions to the "inactive" state.
+ *
+ * @return the new state
+ */
+ State goInactive();
+
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
new file mode 100644
index 00000000..7c0436eb
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -0,0 +1,647 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import com.google.gson.JsonParseException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.state.ActiveState;
+import org.onap.policy.drools.pooling.state.IdleState;
+import org.onap.policy.drools.pooling.state.InactiveState;
+import org.onap.policy.drools.pooling.state.QueryState;
+import org.onap.policy.drools.pooling.state.StartState;
+import org.onap.policy.drools.pooling.state.State;
+import org.onap.policy.drools.pooling.state.StateTimerTask;
+import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
+import org.onap.policy.drools.system.PolicyController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
+ * events coming from external topics are saved in a queue for later processing. Once
+ * assignments are made, the saved events are processed. In addition, while the controller
+ * is locked, events are still forwarded to other hosts and bucket assignments are still
+ * updated, based on any {@link Leader} messages that it receives.
+ */
+public class PoolingManagerImpl implements PoolingManager, TopicListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
+
+ /**
+ * Maximum number of times a message can be forwarded.
+ */
+ public static final int MAX_HOPS = 5;
+
+ /**
+ * ID of this host.
+ */
+ @Getter
+ private final String host;
+
+ /**
+ * Properties with which this was configured.
+ */
+ @Getter
+ private final PoolingProperties properties;
+
+ /**
+ * Associated controller.
+ */
+ private final PolicyController controller;
+
+ /**
+ * Decremented each time the manager enters the Active state. Used by junit tests.
+ */
+ private final CountDownLatch activeLatch;
+
+ /**
+ * Used to encode & decode request objects received from & sent to a rule engine.
+ */
+ private final Serializer serializer;
+
+ /**
+ * Internal DMaaP topic used by this controller.
+ */
+ @Getter
+ private final String topic;
+
+ /**
+ * Manager for the internal DMaaP topic.
+ */
+ private final TopicMessageManager topicMessageManager;
+
+ /**
+ * Lock used while updating {@link #current}. In general, public methods must use
+ * this, while private methods assume the lock is already held.
+ */
+ private final Object curLocker = new Object();
+
+ /**
+ * Current state.
+ *
+ * <p>This uses a finite state machine, wherein the state object contains all of the data
+ * relevant to that state. Each state object has a process() method, specific to each
+ * type of {@link Message} subclass. The method returns the next state object, or
+ * {@code null} if the state is to remain the same.
+ */
+ private State current;
+
+ /**
+ * Current bucket assignments or {@code null}.
+ */
+ @Getter
+ private BucketAssignments assignments = null;
+
+ /**
+ * Pool used to execute timers.
+ */
+ private ScheduledThreadPoolExecutor scheduler = null;
+
+ /**
+ * Constructs the manager, initializing all the data structures.
+ *
+ * @param host name/uuid of this host
+ * @param controller controller with which this is associated
+ * @param props feature properties specific to the controller
+ * @param activeLatch latch to be decremented each time the manager enters the Active
+ * state
+ */
+ public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+ this.host = host;
+ this.controller = controller;
+ this.properties = props;
+ this.activeLatch = activeLatch;
+
+ try {
+ this.serializer = new Serializer();
+ this.topic = props.getPoolingTopic();
+ this.topicMessageManager = makeTopicMessagesManager(props.getPoolingTopic());
+ this.current = new IdleState(this);
+
+ logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
+
+ } catch (ClassCastException e) {
+ logger.error("not a topic listener, controller {}", controller.getName());
+ throw new PoolingFeatureRtException(e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
+ throw new PoolingFeatureRtException(e);
+ }
+ }
+
+ /**
+ * Should only be used by junit tests.
+ *
+ * @return the current state
+ */
+ protected State getCurrent() {
+ synchronized (curLocker) {
+ return current;
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to start. Starts the publisher for the
+ * internal topic, and creates a thread pool for the timers.
+ */
+ public void beforeStart() {
+ synchronized (curLocker) {
+ if (scheduler == null) {
+ topicMessageManager.startPublisher();
+
+ logger.debug("make scheduler thread for topic {}", getTopic());
+ scheduler = makeScheduler();
+
+ /*
+ * Only a handful of timers at any moment, thus we can afford to take the
+ * time to remove them when they're cancelled.
+ */
+ scheduler.setRemoveOnCancelPolicy(true);
+ scheduler.setMaximumPoolSize(1);
+ scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ }
+ }
+ }
+
+ /**
+ * Indicates that the controller has successfully started. Starts the consumer for the
+ * internal topic, enters the {@link StartState}, and sets the filter for the initial
+ * state.
+ */
+ public void afterStart() {
+ synchronized (curLocker) {
+ if (current instanceof IdleState) {
+ topicMessageManager.startConsumer(this);
+ changeState(new StartState(this));
+ }
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
+ * and the current state.
+ */
+ public void beforeStop() {
+ ScheduledThreadPoolExecutor sched;
+
+ synchronized (curLocker) {
+ sched = scheduler;
+ scheduler = null;
+
+ if (!(current instanceof IdleState)) {
+ changeState(new IdleState(this));
+ topicMessageManager.stopConsumer(this);
+ publishAdmin(new Offline(getHost()));
+ }
+
+ assignments = null;
+ }
+
+ if (sched != null) {
+ logger.debug("stop scheduler for topic {}", getTopic());
+ sched.shutdownNow();
+ }
+ }
+
+ /**
+ * Indicates that the controller has stopped. Stops the publisher and logs a warning
+ * if any events are still in the queue.
+ */
+ public void afterStop() {
+ synchronized (curLocker) {
+ /*
+ * stop the publisher, but allow time for any Offline message to be
+ * transmitted
+ */
+ topicMessageManager.stopPublisher(properties.getOfflinePubWaitMs());
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to be locked. Enters the idle state, as all
+ * it will be doing is forwarding messages.
+ */
+ public void beforeLock() {
+ logger.info("locking manager for topic {}", getTopic());
+
+ synchronized (curLocker) {
+ changeState(new IdleState(this));
+ }
+ }
+
+ /**
+ * Indicates that the controller has been unlocked. Enters the start state, if the
+ * controller is running.
+ */
+ public void afterUnlock() {
+ logger.info("unlocking manager for topic {}", getTopic());
+
+ synchronized (curLocker) {
+ if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
+ changeState(new StartState(this));
+ }
+ }
+ }
+
+ /**
+ * Changes the finite state machine to a new state, provided the new state is not
+ * {@code null}.
+ *
+ * @param newState new state, or {@code null} if to remain unchanged
+ */
+ private void changeState(State newState) {
+ if (newState != null) {
+ current.cancelTimers();
+ current = newState;
+
+ newState.start();
+ }
+ }
+
+ @Override
+ public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
+ // wrap the task in a TimerAction and schedule it
+ ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
+
+ // wrap the future in a "CancellableScheduledTask"
+ return () -> fut.cancel(false);
+ }
+
+ @Override
+ public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ // wrap the task in a TimerAction and schedule it
+ ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
+ TimeUnit.MILLISECONDS);
+
+ // wrap the future in a "CancellableScheduledTask"
+ return () -> fut.cancel(false);
+ }
+
+ @Override
+ public void publishAdmin(Message msg) {
+ publish(Message.ADMIN, msg);
+ }
+
+ @Override
+ public void publish(String channel, Message msg) {
+ logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
+
+ msg.setChannel(channel);
+
+ try {
+ // ensure it's valid before we send it
+ msg.checkValidity();
+
+ String txt = serializer.encodeMsg(msg);
+ topicMessageManager.publish(txt);
+
+ } catch (JsonParseException e) {
+ logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
+ }
+ }
+
+ /**
+ * Handles an event from the internal topic.
+ *
+ * @param commType comm infrastructure
+ * @param topic2 topic
+ * @param event event
+ */
+ @Override
+ public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
+
+ if (event == null) {
+ logger.error("null event on topic {}", topic);
+ return;
+ }
+
+ synchronized (curLocker) {
+ // it's on the internal topic
+ handleInternal(event);
+ }
+ }
+
+ /**
+ * Called by the PolicyController before it offers the event to the DroolsController.
+ * If the controller is locked, then it isn't processing events. However, they still
+ * need to be forwarded, thus in that case, they are decoded and forwarded.
+ *
+ * <p>On the other hand, if the controller is not locked, then we just return immediately
+ * and let {@link #beforeInsert(String, Object) beforeInsert()} handle
+ * it instead, as it already has the decoded message.
+ *
+ * @param topic2 topic
+ * @param event event
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
+ */
+ public boolean beforeOffer(String topic2, String event) {
+
+ if (!controller.isLocked()) {
+ // we should NOT intercept this message - let the invoker handle it
+ return false;
+ }
+
+ return handleExternal(topic2, decodeEvent(topic2, event));
+ }
+
+ /**
+ * Called by the DroolsController before it inserts the event into the rule engine.
+ *
+ * @param topic2 topic
+ * @param event event, as an object
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
+ */
+ public boolean beforeInsert(String topic2, Object event) {
+ return handleExternal(topic2, event);
+ }
+
+ /**
+ * Handles an event from an external topic.
+ *
+ * @param topic2 topic
+ * @param event event, as an object, or {@code null} if it cannot be decoded
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
+ */
+ private boolean handleExternal(String topic2, Object event) {
+ if (event == null) {
+ // no event - let the invoker handle it
+ return false;
+ }
+
+ synchronized (curLocker) {
+ return handleExternal(topic2, event, event.hashCode());
+ }
+ }
+
+ /**
+ * Handles an event from an external topic.
+ *
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
+ * @return {@code true} if the event was handled, {@code false} if the invoker should
+ * handle it
+ */
+ private boolean handleExternal(String topic2, Object event, int eventHashCode) {
+ if (assignments == null) {
+ // no bucket assignments yet - handle locally
+ logger.info("handle event locally for request {}", event);
+
+ // we did NOT consume the event
+ return false;
+
+ } else {
+ return handleEvent(topic2, event, eventHashCode);
+ }
+ }
+
+ /**
+ * Handles a {@link Forward} event, possibly forwarding it again.
+ *
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
+ * @return {@code true} if the event was handled, {@code false} if the invoker should
+ * handle it
+ */
+ private boolean handleEvent(String topic2, Object event, int eventHashCode) {
+ String target = assignments.getAssignedHost(eventHashCode);
+
+ if (target == null) {
+ /*
+ * This bucket has no assignment - just discard the event
+ */
+ logger.warn("discarded event for unassigned bucket from topic {}", topic2);
+ return true;
+ }
+
+ if (target.equals(host)) {
+ /*
+ * Message belongs to this host - allow the controller to handle it.
+ */
+ logger.info("handle local event for request {} from topic {}", event, topic2);
+ return false;
+ }
+
+ // not our message, consume the event
+ logger.warn("discarded event for host {} from topic {}", target, topic2);
+ return true;
+ }
+
+ /**
+ * Decodes an event from a String into an event Object.
+ *
+ * @param topic2 topic
+ * @param event event
+ * @return the decoded event object, or {@code null} if it can't be decoded
+ */
+ private Object decodeEvent(String topic2, String event) {
+ DroolsController drools = controller.getDrools();
+
+ // check if this topic has a decoder
+
+ if (!canDecodeEvent(drools, topic2)) {
+
+ logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
+ drools.getArtifactId());
+ return null;
+ }
+
+ // decode
+
+ try {
+ return decodeEventWrapper(drools, topic2, event);
+
+ } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
+ logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
+ return null;
+ }
+ }
+
+ /**
+ * Handles an event from the internal topic. This uses reflection to identify the
+ * appropriate process() method to invoke, based on the type of Message that was
+ * decoded.
+ *
+ * @param event the serialized {@link Message} read from the internal topic
+ */
+ private void handleInternal(String event) {
+ Class<?> clazz = null;
+
+ try {
+ Message msg = serializer.decodeMsg(event);
+
+ // get the class BEFORE checking the validity
+ clazz = msg.getClass();
+
+ msg.checkValidity();
+
+ var meth = current.getClass().getMethod("process", msg.getClass());
+ changeState((State) meth.invoke(current, msg));
+
+ } catch (JsonParseException e) {
+ logger.warn("failed to decode message for topic {}", topic, e);
+
+ } catch (NoSuchMethodException | SecurityException e) {
+ logger.error("no processor for message {} for topic {}", clazz, topic, e);
+
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
+ | PoolingFeatureException e) {
+ logger.error("failed to process message {} for topic {}", clazz, topic, e);
+ }
+ }
+
+ @Override
+ public void startDistributing(BucketAssignments asgn) {
+ synchronized (curLocker) {
+ int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
+ logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
+ assignments = asgn;
+ }
+ }
+
+ @Override
+ public State goStart() {
+ return new StartState(this);
+ }
+
+ @Override
+ public State goQuery() {
+ return new QueryState(this);
+ }
+
+ @Override
+ public State goActive() {
+ activeLatch.countDown();
+ return new ActiveState(this);
+ }
+
+ @Override
+ public State goInactive() {
+ return new InactiveState(this);
+ }
+
+ /**
+ * Action to run a timer task. Only runs the task if the machine is still in the state
+ * that it was in when the timer was created.
+ */
+ private class TimerAction implements Runnable {
+
+ /**
+ * State of the machine when the timer was created.
+ */
+ private State origState;
+
+ /**
+ * Task to be executed.
+ */
+ private StateTimerTask task;
+
+ /**
+ * Constructor.
+ *
+ * @param task task to execute when this timer runs
+ */
+ public TimerAction(StateTimerTask task) {
+ this.origState = current;
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ synchronized (curLocker) {
+ if (current == origState) {
+ changeState(task.fire());
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a DMaaP manager.
+ *
+ * @param topic name of the internal DMaaP topic
+ * @return a new topic messages manager
+ * @throws PoolingFeatureException if an error occurs
+ */
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ return new TopicMessageManager(topic);
+ }
+
+ /**
+ * Creates a scheduled thread pool.
+ *
+ * @return a new scheduled thread pool
+ */
+ protected ScheduledThreadPoolExecutor makeScheduler() {
+ return new ScheduledThreadPoolExecutor(1);
+ }
+
+ /**
+ * Determines if the event can be decoded.
+ *
+ * @param drools drools controller
+ * @param topic topic on which the event was received
+ * @return {@code true} if the event can be decoded, {@code false} otherwise
+ */
+ protected boolean canDecodeEvent(DroolsController drools, String topic) {
+ return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(),
+ topic);
+ }
+
+ /**
+ * Decodes the event.
+ *
+ * @param drools drools controller
+ * @param topic topic on which the event was received
+ * @param event event text to be decoded
+ * @return the decoded event
+ * @throws IllegalArgumentException illegal argument
+ * @throws UnsupportedOperationException unsupported operation
+ * @throws IllegalStateException illegal state
+ */
+ protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
+ return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic,
+ event);
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
new file mode 100644
index 00000000..fd1c3d3d
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
@@ -0,0 +1,150 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.util.Properties;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.BeanConfigurator;
+import org.onap.policy.common.utils.properties.Property;
+import org.onap.policy.common.utils.properties.SpecProperties;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+/**
+ * Properties used by the pooling feature, specific to a controller.
+ */
+@Getter
+@Setter
+public class PoolingProperties {
+
+ /**
+ * The feature name, used to retrieve properties.
+ */
+ public static final String FEATURE_NAME = "feature-pooling-messages";
+
+ /**
+ * Feature properties all begin with this prefix.
+ */
+ public static final String PREFIX = "pooling.";
+
+ public static final String FEATURE_ENABLED = PREFIX + "enabled";
+ public static final String POOLING_TOPIC = PREFIX + "topic";
+ public static final String OFFLINE_LIMIT = PREFIX + "offline.queue.limit";
+ public static final String OFFLINE_AGE_MS = PREFIX + "offline.queue.age.milliseconds";
+ public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "offline.publish.wait.milliseconds";
+ public static final String START_HEARTBEAT_MS = PREFIX + "start.heartbeat.milliseconds";
+ public static final String REACTIVATE_MS = PREFIX + "reactivate.milliseconds";
+ public static final String IDENTIFICATION_MS = PREFIX + "identification.milliseconds";
+ public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "active.heartbeat.milliseconds";
+ public static final String INTER_HEARTBEAT_MS = PREFIX + "inter.heartbeat.milliseconds";
+
+ /**
+ * Type of item that the extractors will be extracting.
+ */
+ public static final String EXTRACTOR_TYPE = "requestId";
+
+ /**
+ * Prefix for extractor properties.
+ */
+ public static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE;
+
+ /**
+ * Properties from which this was constructed.
+ */
+ private Properties source;
+
+ /**
+ * Topic used for inter-host communication.
+ */
+ @Property(name = POOLING_TOPIC)
+ private String poolingTopic;
+
+ /**
+ * Maximum number of events to retain in the queue while waiting for
+ * buckets to be assigned.
+ */
+ @Property(name = OFFLINE_LIMIT, defaultValue = "1000")
+ private int offlineLimit;
+
+ /**
+ * Maximum age, in milliseconds, of events to be retained in the queue.
+ * Events older than this are discarded.
+ */
+ @Property(name = OFFLINE_AGE_MS, defaultValue = "60000")
+ private long offlineAgeMs;
+
+ /**
+ * Time, in milliseconds, to wait for an "Offline" message to be published
+ * to topic.
+ */
+ @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
+ private long offlinePubWaitMs;
+
+ /**
+ * Time, in milliseconds, to wait for this host's heart beat during the
+ * start-up state.
+ */
+ @Property(name = START_HEARTBEAT_MS, defaultValue = "100000")
+ private long startHeartbeatMs;
+
+ /**
+ * Time, in milliseconds, to wait before attempting to reactivate this
+ * host when it has no bucket assignments.
+ */
+ @Property(name = REACTIVATE_MS, defaultValue = "50000")
+ private long reactivateMs;
+
+ /**
+ * Time, in milliseconds, to wait for all Identification messages to
+ * arrive during the query state.
+ */
+ @Property(name = IDENTIFICATION_MS, defaultValue = "50000")
+ private long identificationMs;
+
+ /**
+ * Time, in milliseconds, to wait for heart beats from this host, or its
+ * predecessor, during the active state.
+ */
+ @Property(name = ACTIVE_HEARTBEAT_MS, defaultValue = "50000")
+ private long activeHeartbeatMs;
+
+ /**
+ * Time, in milliseconds, to wait between heart beat generations during
+ * the active and start-up states.
+ */
+ @Property(name = INTER_HEARTBEAT_MS, defaultValue = "15000")
+ private long interHeartbeatMs;
+
+ /**
+ * Constructor.
+ *
+ * @param controllerName the name of the controller
+ * @param props set of properties used to configure this
+ * @throws PropertyException if an error occurs
+ *
+ */
+ public PoolingProperties(String controllerName, Properties props) throws PropertyException {
+ source = props;
+
+ new BeanConfigurator().configureFromProperties(this, new SpecProperties(PREFIX, controllerName, props));
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/Serializer.java
new file mode 100644
index 00000000..2894b1da
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/Serializer.java
@@ -0,0 +1,124 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import java.util.HashMap;
+import java.util.Map;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+/**
+ * Serialization helper functions.
+ */
+public class Serializer {
+
+ /**
+ * The message type is stored in fields of this name within the JSON.
+ */
+ private static final String TYPE_FIELD = "type";
+
+ /**
+ * Used to encode & decode JSON messages sent & received, respectively, on the
+ * internal topic.
+ */
+ private final Gson gson = new Gson();
+
+ /**
+ * Maps a message subclass to its type.
+ */
+ private static final Map<Class<? extends Message>, String> class2type = new HashMap<>();
+
+ /**
+ * Maps a message type to the appropriate subclass.
+ */
+ private static final Map<String, Class<? extends Message>> type2class = new HashMap<>();
+
+ static {
+ class2type.put(Heartbeat.class, "heartbeat");
+ class2type.put(Identification.class, "identification");
+ class2type.put(Leader.class, "leader");
+ class2type.put(Offline.class, "offline");
+ class2type.put(Query.class, "query");
+
+ class2type.forEach((clazz, type) -> type2class.put(type, clazz));
+ }
+
+ /**
+ * Encodes a filter.
+ *
+ * @param filter filter to be encoded
+ * @return the filter, serialized as a JSON string
+ */
+ public String encodeFilter(Map<String, Object> filter) {
+ return gson.toJson(filter);
+ }
+
+ /**
+ * Encodes a message.
+ *
+ * @param msg message to be encoded
+ * @return the message, serialized as a JSON string
+ */
+ public String encodeMsg(Message msg) {
+ JsonElement jsonEl = gson.toJsonTree(msg);
+
+ String type = class2type.get(msg.getClass());
+ if (type == null) {
+ throw new JsonParseException("cannot serialize " + msg.getClass());
+ }
+
+ jsonEl.getAsJsonObject().addProperty(TYPE_FIELD, type);
+
+ return gson.toJson(jsonEl);
+ }
+
+ /**
+ * Decodes a JSON string into a Message.
+ *
+ * @param msg JSON string representing the message
+ * @return the message
+ */
+ public Message decodeMsg(String msg) {
+ JsonElement jsonEl = gson.fromJson(msg, JsonElement.class);
+
+ JsonElement typeEl = jsonEl.getAsJsonObject().get(TYPE_FIELD);
+ if (typeEl == null) {
+ throw new JsonParseException("cannot deserialize " + Message.class
+ + " because it does not contain a field named " + TYPE_FIELD);
+
+ }
+
+ Class<? extends Message> clazz = type2class.get(typeEl.getAsString());
+ if (clazz == null) {
+ throw new JsonParseException("cannot deserialize " + typeEl);
+ }
+
+ return gson.fromJson(jsonEl, clazz);
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java
new file mode 100644
index 00000000..4c9b3f34
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java
@@ -0,0 +1,233 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.util.List;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the internal topic. Assumes all topics are managed by
+ * {@link TopicEndpoint}.
+ */
+public class TopicMessageManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(TopicMessageManager.class);
+
+ /**
+ * Name of the topic.
+ */
+ @Getter
+ private final String topic;
+
+ /**
+ * Topic source whose filter is to be manipulated.
+ */
+ private final TopicSource topicSource;
+
+ /**
+ * Where to publish messages.
+ */
+ private final TopicSink topicSink;
+
+ /**
+ * {@code True} if the consumer is running, {@code false} otherwise.
+ */
+ private boolean consuming = false;
+
+ /**
+ * {@code True} if the publisher is running, {@code false} otherwise.
+ */
+ private boolean publishing = false;
+
+ /**
+ * Constructs the manager, but does not start the source or sink.
+ *
+ * @param topic name of the internal topic
+ * @throws PoolingFeatureException if an error occurs
+ */
+ public TopicMessageManager(String topic) throws PoolingFeatureException {
+
+ logger.info("initializing bus for topic {}", topic);
+
+ try {
+ this.topic = topic;
+
+ this.topicSource = findTopicSource();
+ this.topicSink = findTopicSink();
+
+ } catch (IllegalArgumentException e) {
+ logger.error("failed to attach to topic {}", topic);
+ throw new PoolingFeatureException(e);
+ }
+ }
+
+ /**
+ * Finds the topic source associated with the internal topic.
+ *
+ * @return the topic source
+ * @throws PoolingFeatureException if the source doesn't exist or is not filterable
+ */
+ private TopicSource findTopicSource() throws PoolingFeatureException {
+ for (TopicSource src : getTopicSources()) {
+ if (topic.equals(src.getTopic())) {
+ return src;
+ }
+ }
+
+ throw new PoolingFeatureException("missing topic source " + topic);
+ }
+
+ /**
+ * Finds the topic sink associated with the internal topic.
+ *
+ * @return the topic sink
+ * @throws PoolingFeatureException if the sink doesn't exist
+ */
+ private TopicSink findTopicSink() throws PoolingFeatureException {
+ for (TopicSink sink : getTopicSinks()) {
+ if (topic.equals(sink.getTopic())) {
+ return sink;
+ }
+ }
+
+ throw new PoolingFeatureException("missing topic sink " + topic);
+ }
+
+ /**
+ * Starts the publisher, if it isn't already running.
+ */
+ public void startPublisher() {
+ if (publishing) {
+ return;
+ }
+
+ logger.info("start publishing to topic {}", topic);
+ publishing = true;
+ }
+
+ /**
+ * Stops the publisher.
+ *
+ * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
+ * close
+ */
+ public void stopPublisher(long waitMs) {
+ if (!publishing) {
+ return;
+ }
+
+ /*
+ * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
+ * could be passed to sink.stop(), but that isn't an option at this time.
+ */
+ try {
+ Thread.sleep(waitMs);
+
+ } catch (InterruptedException e) {
+ logger.warn("message transmission stopped due to {}", e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+
+ logger.info("stop publishing to topic {}", topic);
+ publishing = false;
+ }
+
+ /**
+ * Starts the consumer, if it isn't already running.
+ *
+ * @param listener listener to register with the source
+ */
+ public void startConsumer(TopicListener listener) {
+ if (consuming) {
+ return;
+ }
+
+ logger.info("start consuming from topic {}", topic);
+ topicSource.register(listener);
+ consuming = true;
+ }
+
+ /**
+ * Stops the consumer.
+ *
+ * @param listener listener to unregister with the source
+ */
+ public void stopConsumer(TopicListener listener) {
+ if (!consuming) {
+ return;
+ }
+
+ logger.info("stop consuming from topic {}", topic);
+ consuming = false;
+ topicSource.unregister(listener);
+ }
+
+ /**
+ * Publishes a message to the sink.
+ *
+ * @param msg message to be published
+ * @throws PoolingFeatureException if an error occurs or the publisher isn't running
+ */
+ public void publish(String msg) throws PoolingFeatureException {
+ if (!publishing) {
+ throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
+ }
+
+ try {
+ if (!topicSink.send(msg)) {
+ throw new PoolingFeatureException("failed to send to topic sink " + topic);
+ }
+
+ } catch (IllegalStateException e) {
+ throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
+ }
+ }
+
+ /*
+ * The remaining methods may be overridden by junit tests.
+ */
+
+ /**
+ * Get topic source.
+ *
+ * @return the topic sources
+ */
+ protected List<TopicSource> getTopicSources() {
+ return TopicEndpointManager.getManager().getTopicSources();
+ }
+
+ /**
+ * Get topic sinks.
+ *
+ * @return the topic sinks
+ */
+ protected List<TopicSink> getTopicSinks() {
+ return TopicEndpointManager.getManager().getTopicSinks();
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
new file mode 100644
index 00000000..485a9d2e
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
@@ -0,0 +1,205 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Bucket assignments, which is simply an array of host names.
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+public class BucketAssignments {
+
+ private static final Logger logger = LoggerFactory.getLogger(BucketAssignments.class);
+
+ /**
+ * The number of bits in the maximum number of buckets.
+ */
+ private static final int MAX_BUCKET_BITS = 10;
+
+ /**
+ * Maximum number of buckets. Must be a power of two.
+ */
+ public static final int MAX_BUCKETS = 1 << MAX_BUCKET_BITS;
+
+ /**
+ * Used to ensure that a hash code is not negative.
+ */
+ private static final int MAX_BUCKETS_MASK = MAX_BUCKETS - 1;
+
+ /**
+ * Identifies the host serving a particular bucket.
+ */
+ private String[] hostArray = null;
+
+
+ /**
+ * Constructor.
+ *
+ * @param hostArray maps a bucket number (i.e., array index) to a host. All values
+ * must be non-null
+ */
+ public BucketAssignments(String[] hostArray) {
+ this.hostArray = hostArray;
+ }
+
+ /**
+ * Gets the leader, which is the host with the minimum UUID.
+ *
+ * @return the assignment leader
+ */
+ public String getLeader() {
+ if (hostArray == null) {
+ return null;
+ }
+
+ String leader = null;
+
+ for (String host : hostArray) {
+ if (host != null && (leader == null || host.compareTo(leader) < 0)) {
+ leader = host;
+ }
+ }
+
+ return leader;
+
+ }
+
+ /**
+ * Determines if a host has an assignment.
+ *
+ * @param host host to be checked
+ * @return {@code true} if the host has an assignment, {@code false} otherwise
+ */
+ public boolean hasAssignment(String host) {
+ if (hostArray == null) {
+ return false;
+ }
+
+ for (String host2 : hostArray) {
+ if (host.equals(host2)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Gets all the hosts that have an assignment.
+ *
+ * @return all the hosts that have an assignment
+ */
+ public Set<String> getAllHosts() {
+ Set<String> set = new HashSet<>();
+ if (hostArray == null) {
+ return set;
+ }
+
+ for (String host : hostArray) {
+ if (host != null) {
+ set.add(host);
+ }
+ }
+
+ return set;
+ }
+
+ /**
+ * Gets the host assigned to a given bucket.
+ *
+ * @param hashCode hash code of the item whose assignment is desired
+ * @return the assigned host, or {@code null} if the item has no assigned host
+ */
+ public String getAssignedHost(int hashCode) {
+ if (hostArray == null || hostArray.length == 0) {
+ logger.error("no buckets have been assigned");
+ return null;
+ }
+
+ return hostArray[(Math.abs(hashCode) & MAX_BUCKETS_MASK) % hostArray.length];
+ }
+
+ /**
+ * Gets the number of buckets.
+ *
+ * @return the number of buckets
+ */
+ public int size() {
+ return (hostArray != null ? hostArray.length : 0);
+ }
+
+ /**
+ * Checks the validity of the assignments, verifying that all buckets have been
+ * assigned to a host.
+ *
+ * @throws PoolingFeatureException if the assignments are invalid
+ */
+ public void checkValidity() throws PoolingFeatureException {
+ if (hostArray == null || hostArray.length == 0) {
+ throw new PoolingFeatureException("missing hosts in message bucket assignments");
+ }
+
+ if (hostArray.length > MAX_BUCKETS) {
+ throw new PoolingFeatureException("too many hosts in message bucket assignments");
+ }
+
+ for (var x = 0; x < hostArray.length; ++x) {
+ if (hostArray[x] == null) {
+ throw new PoolingFeatureException("bucket " + x + " has no assignment");
+ }
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final var prime = 31;
+ var result = 1;
+ result = prime * result + Arrays.hashCode(hostArray);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ BucketAssignments other = (BucketAssignments) obj;
+ return Arrays.equals(hostArray, other.hostArray);
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java
new file mode 100644
index 00000000..4a8bdc3b
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * Heart beat message sent to self, or to the succeeding host.
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+public class Heartbeat extends Message {
+
+ /**
+ * Time, in milliseconds, when this was created.
+ */
+ private long timestampMs;
+
+ /**
+ * Constructor.
+ *
+ * @param source host on which the message originated
+ * @param timestampMs time, in milliseconds, associated with the message
+ */
+ public Heartbeat(String source, long timestampMs) {
+ super(source);
+
+ this.timestampMs = timestampMs;
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Identification.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Identification.java
new file mode 100644
index 00000000..b8fd8414
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Identification.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import lombok.NoArgsConstructor;
+
+/**
+ * Identifies the source host and the bucket assignments which it knows about.
+ */
+@NoArgsConstructor
+public class Identification extends MessageWithAssignments {
+
+ /**
+ * Constructor.
+ *
+ * @param source host on which the message originated
+ * @param assignments assignments
+ */
+ public Identification(String source, BucketAssignments assignments) {
+ super(source, assignments);
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Leader.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
new file mode 100644
index 00000000..10c33382
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
@@ -0,0 +1,70 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import lombok.NoArgsConstructor;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+
+/**
+ * Indicates that the "source" of this message is now the "lead" host.
+ */
+@NoArgsConstructor
+public class Leader extends MessageWithAssignments {
+
+ /**
+ * Constructor.
+ *
+ * @param source host on which the message originated
+ * @param assignments assignments
+ */
+ public Leader(String source, BucketAssignments assignments) {
+ super(source, assignments);
+ }
+
+ /**
+ * Also verifies that buckets have been assigned and that the source is
+ * indeed the leader.
+ */
+ @Override
+ public void checkValidity() throws PoolingFeatureException {
+
+ super.checkValidity();
+
+ BucketAssignments assignments = getAssignments();
+ if (assignments == null) {
+ throw new PoolingFeatureException("missing message bucket assignments");
+ }
+
+ String leader = getSource();
+
+ if (!assignments.hasAssignment(leader)) {
+ throw new PoolingFeatureException("leader " + leader + " has no bucket assignments");
+ }
+
+ for (String host : assignments.getHostArray()) {
+ if (host.compareTo(leader) < 0) {
+ throw new PoolingFeatureException("invalid leader " + leader + ", should be " + host);
+ }
+ }
+ }
+
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Message.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Message.java
new file mode 100644
index 00000000..bb973142
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Message.java
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+
+/**
+ * Messages sent on the internal topic.
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+public class Message {
+
+ /**
+ * Name of the administrative channel.
+ */
+ public static final String ADMIN = "_admin";
+
+ /**
+ * Host that originated the message.
+ */
+ private String source;
+
+ /**
+ * Channel on which the message is routed, which is either the target host
+ * or {@link #ADMIN}.
+ */
+ private String channel;
+
+
+ /**
+ * Constructor.
+ *
+ * @param source host on which the message originated
+ */
+ public Message(String source) {
+ this.source = source;
+ }
+
+ /**
+ * Checks the validity of the message, including verifying that required
+ * fields are not missing.
+ *
+ * @throws PoolingFeatureException if the message is invalid
+ */
+ public void checkValidity() throws PoolingFeatureException {
+ if (source == null || source.isEmpty()) {
+ throw new PoolingFeatureException("missing message source");
+ }
+
+ if (channel == null || channel.isEmpty()) {
+ throw new PoolingFeatureException("missing message channel");
+ }
+ }
+
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
new file mode 100644
index 00000000..b8521dd8
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
@@ -0,0 +1,68 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+
+/**
+ * A Message that includes bucket assignments.
+ */
+@Setter
+@Getter
+@NoArgsConstructor
+public class MessageWithAssignments extends Message {
+
+ /**
+ * Bucket assignments, as known by the source host.
+ */
+ private BucketAssignments assignments;
+
+
+ /**
+ * Constructor.
+ *
+ * @param source host on which the message originated
+ * @param assignments assignments
+ */
+ public MessageWithAssignments(String source, BucketAssignments assignments) {
+ super(source);
+
+ this.assignments = assignments;
+ }
+
+ /**
+ * If there are any assignments, it verifies there validity.
+ */
+ @Override
+ public void checkValidity() throws PoolingFeatureException {
+
+ super.checkValidity();
+
+ if (assignments != null) {
+ assignments.checkValidity();
+ }
+ }
+
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Offline.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Offline.java
new file mode 100644
index 00000000..3ff7bf1e
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Offline.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import lombok.NoArgsConstructor;
+
+/**
+ * Indicates that the source host is going offline and will be unable to process
+ * any further requests.
+ */
+@NoArgsConstructor
+public class Offline extends Message {
+
+ /**
+ * Constructor.
+ *
+ * @param source host on which the message originated
+ */
+ public Offline(String source) {
+ super(source);
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Query.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Query.java
new file mode 100644
index 00000000..4c856bb7
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Query.java
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import lombok.NoArgsConstructor;
+
+/**
+ * Query the other hosts for their identification.
+ */
+@NoArgsConstructor
+public class Query extends Message {
+
+ /**
+ * Constructor.
+ *
+ * @param source host on which the message originated
+ */
+ public Query(String source) {
+ super(source);
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
new file mode 100644
index 00000000..cafcb45b
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
@@ -0,0 +1,270 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.Arrays;
+import java.util.TreeSet;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The active state. In this state, this host has one more bucket assignments and
+ * processes any events associated with one of its buckets. Other events are forwarded to
+ * appropriate target hosts.
+ */
+@Getter(AccessLevel.PROTECTED)
+public class ActiveState extends ProcessingState {
+
+ private static final Logger logger = LoggerFactory.getLogger(ActiveState.class);
+
+ /**
+ * Set of hosts that have been assigned a bucket.
+ */
+ @Getter(AccessLevel.NONE)
+ private final TreeSet<String> assigned = new TreeSet<>();
+
+ /**
+ * Host that comes after this host, or {@code null} if it has no successor.
+ */
+ private String succHost = null;
+
+ /**
+ * Host that comes before this host, or "" if it has no predecessor.
+ */
+ private String predHost = "";
+
+ /**
+ * {@code True} if we saw this host's heart beat since the last check, {@code false}
+ * otherwise.
+ */
+ private boolean myHeartbeatSeen = false;
+
+ /**
+ * {@code True} if we saw the predecessor's heart beat since the last check,
+ * {@code false} otherwise.
+ */
+ private boolean predHeartbeatSeen = false;
+
+
+ /**
+ * Constructor.
+ *
+ * @param mgr pooling manager
+ */
+ public ActiveState(PoolingManager mgr) {
+ super(mgr, mgr.getAssignments().getLeader());
+
+ assigned.addAll(Arrays.asList(mgr.getAssignments().getHostArray()));
+
+ detmNeighbors();
+ }
+
+ /**
+ * Determine this host's neighbors based on the order of the host UUIDs. Updates
+ * {@link #succHost} and {@link #predHost}.
+ */
+ private void detmNeighbors() {
+ if (assigned.size() < 2) {
+ logger.info("this host has no neighbors on topic {}", getTopic());
+ /*
+ * this host is the only one with any assignments - it has no neighbors
+ */
+ succHost = null;
+ predHost = "";
+ return;
+ }
+
+ if ((succHost = assigned.higher(getHost())) == null) {
+ // wrapped around - successor is the first host in the set
+ succHost = assigned.first();
+ }
+ logger.info("this host's successor is {} on topic {}", succHost, getTopic());
+
+ if ((predHost = assigned.lower(getHost())) == null) {
+ // wrapped around - predecessor is the last host in the set
+ predHost = assigned.last();
+ }
+ logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ addTimers();
+ genHeartbeat();
+ }
+
+ /**
+ * Adds the timers.
+ */
+ private void addTimers() {
+ logger.info("add timers");
+
+ /*
+ * heart beat generator
+ */
+ long genMs = getProperties().getInterHeartbeatMs();
+
+ scheduleWithFixedDelay(genMs, genMs, () -> {
+ genHeartbeat();
+ return null;
+ });
+
+ /*
+ * my heart beat checker
+ */
+ long waitMs = getProperties().getActiveHeartbeatMs();
+
+ scheduleWithFixedDelay(waitMs, waitMs, () -> {
+ if (myHeartbeatSeen) {
+ myHeartbeatSeen = false;
+ return null;
+ }
+
+ // missed my heart beat
+ logger.error("missed my heartbeat on topic {}", getTopic());
+
+ return missedHeartbeat();
+ });
+
+ /*
+ * predecessor heart beat checker
+ */
+ if (!predHost.isEmpty()) {
+
+ scheduleWithFixedDelay(waitMs, waitMs, () -> {
+ if (predHeartbeatSeen) {
+ predHeartbeatSeen = false;
+ return null;
+ }
+
+ // missed the predecessor's heart beat
+ logger.warn("missed predecessor's heartbeat on topic {}", getTopic());
+
+ publish(makeQuery());
+
+ return goQuery();
+ });
+ }
+ }
+
+ /**
+ * Generates a heart beat for this host and its successor.
+ */
+ private void genHeartbeat() {
+ var msg = makeHeartbeat(System.currentTimeMillis());
+ publish(getHost(), msg);
+
+ if (succHost != null) {
+ publish(succHost, msg);
+ }
+ }
+
+ @Override
+ public State process(Heartbeat msg) {
+ String src = msg.getSource();
+
+ if (src == null) {
+ logger.warn("Heartbeat message has no source on topic {}", getTopic());
+
+ } else if (src.equals(getHost())) {
+ logger.info("saw my heartbeat on topic {}", getTopic());
+ myHeartbeatSeen = true;
+
+ } else if (src.equals(predHost)) {
+ logger.info("saw heartbeat from {} on topic {}", src, getTopic());
+ predHeartbeatSeen = true;
+
+ } else {
+ logger.info("ignored heartbeat message from {} on topic {}", src, getTopic());
+ }
+
+ return null;
+ }
+
+ @Override
+ public State process(Leader msg) {
+ if (!isValid(msg)) {
+ return null;
+ }
+
+ String src = msg.getSource();
+
+ if (getHost().compareTo(src) < 0) {
+ // our host would be a better leader - find out what's up
+ logger.warn("unexpected Leader message from {} on topic {}", src, getTopic());
+ return goQuery();
+ }
+
+ logger.info("have a new leader {} on topic {}", src, getTopic());
+
+ return goActive(msg.getAssignments());
+ }
+
+ @Override
+ public State process(Offline msg) {
+ String src = msg.getSource();
+
+ if (src == null) {
+ logger.warn("Offline message has no source on topic {}", getTopic());
+ return null;
+
+ } else if (!assigned.contains(src)) {
+ /*
+ * the offline host wasn't assigned any buckets, so just ignore the message
+ */
+ logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic());
+ return null;
+
+ } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) {
+ /*
+ * Case 1: We are the leader.
+ *
+ * Case 2: Our predecessor was the leader, and it has gone offline - we should
+ * become the leader.
+ *
+ * In either case, we are now the leader, and we must re-balance the buckets
+ * since one of the hosts has gone offline.
+ */
+
+ logger.info("Offline message from source {} on topic {}", src, getTopic());
+
+ assigned.remove(src);
+
+ return becomeLeader(assigned);
+
+ } else {
+ /*
+ * Otherwise, we don't care right now - we'll wait for the leader to tell us
+ * it's been removed.
+ */
+ logger.info("ignore Offline message from source {} on topic {}", src, getTopic());
+ return null;
+ }
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java
new file mode 100644
index 00000000..06418280
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import org.onap.policy.drools.pooling.PoolingManager;
+
+/**
+ * Idle state, used when offline.
+ */
+public class IdleState extends State {
+
+ public IdleState(PoolingManager mgr) {
+ super(mgr);
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
new file mode 100644
index 00000000..7fc220a0
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The inactive state. In this state, we just wait a bit and then try to re-activate. In
+ * the meantime, all messages are ignored.
+ */
+public class InactiveState extends State {
+
+ private static final Logger logger = LoggerFactory.getLogger(InactiveState.class);
+
+ /**
+ * Constructor.
+ *
+ * @param mgr pooling manager
+ */
+ public InactiveState(PoolingManager mgr) {
+ super(mgr);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ schedule(getProperties().getReactivateMs(), this::goStart);
+ }
+
+ @Override
+ public State process(Leader msg) {
+ if (isValid(msg)) {
+ logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic());
+ return goActive(msg.getAssignments());
+ }
+
+ return null;
+ }
+
+ /**
+ * Generates an Identification message and goes to the query state.
+ */
+ @Override
+ public State process(Query msg) {
+ logger.info("received Query message on topic {}", getTopic());
+ publish(makeIdentification());
+ return goQuery();
+ }
+
+ /**
+ * Remains in this state, without resetting any timers.
+ */
+ @Override
+ protected State goInactive() {
+ return null;
+ }
+
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
new file mode 100644
index 00000000..76914b75
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
@@ -0,0 +1,398 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Any state in which events are being processed locally and forwarded, as appropriate.
+ */
+@Setter
+@Getter
+public class ProcessingState extends State {
+
+ private static final Logger logger = LoggerFactory.getLogger(ProcessingState.class);
+
+ /**
+ * Current known leader, never {@code null}.
+ */
+ @NonNull
+ private String leader;
+
+ /**
+ * Constructor.
+ *
+ * @param mgr pooling manager
+ * @param leader current known leader, which need not be the same as the assignment
+ * leader. Never {@code null}
+ * @throws IllegalArgumentException if an argument is invalid
+ */
+ public ProcessingState(PoolingManager mgr, @NonNull String leader) {
+ super(mgr);
+
+ BucketAssignments assignments = mgr.getAssignments();
+
+ if (assignments != null) {
+ String[] arr = assignments.getHostArray();
+ if (arr != null && arr.length == 0) {
+ throw new IllegalArgumentException("zero-length bucket assignments");
+ }
+ }
+
+ this.leader = leader;
+ }
+
+ /**
+ * Generates an Identification message and goes to the query state.
+ */
+ @Override
+ public State process(Query msg) {
+ logger.info("received Query message on topic {}", getTopic());
+ publish(makeIdentification());
+ return goQuery();
+ }
+
+ /**
+ * Sets the assignments.
+ *
+ * @param assignments new assignments, or {@code null}
+ */
+ protected final void setAssignments(BucketAssignments assignments) {
+ if (assignments != null) {
+ startDistributing(assignments);
+ }
+ }
+
+ /**
+ * Determines if this host is the leader, based on the current assignments.
+ *
+ * @return {@code true} if this host is the leader, {@code false} otherwise
+ */
+ public boolean isLeader() {
+ return getHost().equals(leader);
+ }
+
+ /**
+ * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}.
+ *
+ * @param alive hosts that are known to be alive
+ *
+ * @return the new state
+ */
+ protected State becomeLeader(SortedSet<String> alive) {
+ String newLeader = getHost();
+
+ if (!newLeader.equals(alive.first())) {
+ throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first());
+ }
+
+ var msg = makeLeader(alive);
+ logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());
+
+ publish(msg);
+
+ return goActive(msg.getAssignments());
+ }
+
+ /**
+ * Makes a leader message. Assumes "this" host is the leader, and thus appears as the
+ * first host in the set of hosts that are still alive.
+ *
+ * @param alive hosts that are known to be alive
+ *
+ * @return a new message
+ */
+ private Leader makeLeader(Set<String> alive) {
+ return new Leader(getHost(), makeAssignments(alive));
+ }
+
+ /**
+ * Makes a set of bucket assignments. Assumes "this" host is the leader.
+ *
+ * @param alive hosts that are known to be alive
+ *
+ * @return a new set of bucket assignments
+ */
+ private BucketAssignments makeAssignments(Set<String> alive) {
+
+ // make a working array from the CURRENT assignments
+ String[] bucket2host = makeBucketArray();
+
+ TreeSet<String> avail = new TreeSet<>(alive);
+
+ // if we have more hosts than buckets, then remove the extra hosts
+ removeExcessHosts(bucket2host.length, avail);
+
+ // create a host bucket for each available host
+ Map<String, HostBucket> host2hb = new HashMap<>();
+ avail.forEach(host -> host2hb.put(host, new HostBucket(host)));
+
+ // add bucket indices to the appropriate host bucket
+ addIndicesToHostBuckets(bucket2host, host2hb);
+
+ // convert the collection back to an array
+ fillArray(host2hb.values(), bucket2host);
+
+ // update bucket2host with new assignments
+ rebalanceBuckets(host2hb.values(), bucket2host);
+
+ return new BucketAssignments(bucket2host);
+ }
+
+ /**
+ * Makes a bucket array, copying the current assignments, if available.
+ *
+ * @return a new bucket array
+ */
+ private String[] makeBucketArray() {
+ BucketAssignments asgn = getAssignments();
+ if (asgn == null) {
+ return new String[BucketAssignments.MAX_BUCKETS];
+ }
+
+ String[] oldArray = asgn.getHostArray();
+ if (oldArray.length == 0) {
+ return new String[BucketAssignments.MAX_BUCKETS];
+ }
+
+ var newArray = new String[oldArray.length];
+ System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
+
+ return newArray;
+ }
+
+ /**
+ * Removes excess hosts from the set of available hosts. Assumes "this" host is the
+ * leader, and thus appears as the first host in the set.
+ *
+ * @param maxHosts maximum number of hosts to be retained
+ * @param avail available hosts
+ */
+ private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
+ while (avail.size() > maxHosts) {
+ /*
+ * Don't remove this host, as it's the leader. Since the leader is always at
+ * the front of the sorted set, we'll just pick off hosts from the back of the
+ * set.
+ */
+ String host = avail.last();
+ avail.remove(host);
+
+ logger.warn("not using extra host {} for topic {}", host, getTopic());
+ }
+ }
+
+ /**
+ * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or
+ * assigned to a host that does not appear within the map are re-assigned to a host
+ * that appears within the map.
+ *
+ * @param bucket2host bucket assignments
+ * @param host2data maps a host name to its {@link HostBucket}
+ */
+ private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
+ LinkedList<Integer> nullBuckets = new LinkedList<>();
+
+ for (var x = 0; x < bucket2host.length; ++x) {
+ String host = bucket2host[x];
+ if (host == null) {
+ nullBuckets.add(x);
+
+ } else {
+ HostBucket hb = host2data.get(host);
+ if (hb == null) {
+ nullBuckets.add(x);
+
+ } else {
+ hb.add(x);
+ }
+ }
+ }
+
+ // assign the null buckets to other hosts
+ assignNullBuckets(nullBuckets, host2data.values());
+ }
+
+ /**
+ * Assigns null buckets (i.e., those having no assignment) to available hosts.
+ *
+ * @param buckets buckets that still need to be assigned to hosts
+ * @param coll collection of current host-bucket assignments
+ */
+ private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
+ // assign null buckets to the hosts with the fewest buckets
+ TreeSet<HostBucket> assignments = new TreeSet<>(coll);
+
+ for (Integer index : buckets) {
+ // add it to the host with the shortest bucket list
+ HostBucket newhb = assignments.pollFirst();
+ assert newhb != null;
+ newhb.add(index);
+
+ // put the item back into the queue, with its new count
+ assignments.add(newhb);
+ }
+ }
+
+ /**
+ * Re-balances the buckets, taking from those that have a larger count and giving to
+ * those that have a smaller count. Populates an output array with the new
+ * assignments.
+ *
+ * @param coll current bucket assignment
+ * @param bucket2host array to be populated with the new assignments
+ */
+ private void rebalanceBuckets(Collection<HostBucket> coll, String[] bucket2host) {
+ if (coll.size() <= 1) {
+ // only one hosts - nothing to rebalance
+ return;
+ }
+
+ TreeSet<HostBucket> assignments = new TreeSet<>(coll);
+
+ for (;;) {
+ HostBucket smaller = assignments.pollFirst();
+ HostBucket larger = assignments.pollLast();
+
+ assert larger != null && smaller != null;
+ if (larger.size() - smaller.size() <= 1) {
+ // it's as balanced as it will get
+ break;
+ }
+
+ // move the bucket from the larger to the smaller
+ Integer bucket = larger.remove();
+ smaller.add(bucket);
+
+ bucket2host[bucket] = smaller.host;
+
+ // put the items back, with their new counts
+ assignments.add(larger);
+ assignments.add(smaller);
+ }
+
+ }
+
+ /**
+ * Fills the array with the host assignments.
+ *
+ * @param coll the host assignments
+ * @param bucket2host array to be filled
+ */
+ private void fillArray(Collection<HostBucket> coll, String[] bucket2host) {
+ for (HostBucket hb : coll) {
+ for (Integer index : hb.buckets) {
+ bucket2host[index] = hb.host;
+ }
+ }
+ }
+
+ /**
+ * Tracks buckets that have been assigned to a host.
+ */
+ protected static class HostBucket implements Comparable<HostBucket> {
+ /**
+ * Host to which the buckets have been assigned.
+ */
+ private String host;
+
+ /**
+ * Buckets that have been assigned to this host.
+ */
+ private Queue<Integer> buckets = new LinkedList<>();
+
+ /**
+ * Constructor.
+ *
+ * @param host host
+ */
+ public HostBucket(String host) {
+ this.host = host;
+ }
+
+ /**
+ * Removes the next bucket from the list.
+ *
+ * @return the next bucket
+ */
+ public final Integer remove() {
+ return buckets.remove();
+ }
+
+ /**
+ * Adds a bucket to the list.
+ *
+ * @param index index of the bucket to add
+ */
+ public final void add(Integer index) {
+ buckets.add(index);
+ }
+
+ /**
+ * Size.
+ *
+ * @return the number of buckets assigned to this host
+ */
+ public final int size() {
+ return buckets.size();
+ }
+
+ /**
+ * Compares host buckets, first by the number of buckets, and then by the host
+ * name.
+ */
+ @Override
+ public final int compareTo(HostBucket other) {
+ int diff = buckets.size() - other.buckets.size();
+ if (diff == 0) {
+ diff = host.compareTo(other.host);
+ }
+ return diff;
+ }
+
+ @Override
+ public final int hashCode() {
+ throw new UnsupportedOperationException("HostBucket cannot be hashed");
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ throw new UnsupportedOperationException("cannot compare HostBuckets");
+ }
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
new file mode 100644
index 00000000..ef401dcb
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
@@ -0,0 +1,204 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.TreeSet;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Query state. In this state, the host waits for the other hosts to identify
+ * themselves. Eventually, a leader should come forth. If not, it will transition to the
+ * active or inactive state, depending on whether it has an assignment in the
+ * current bucket assignments. The other possibility is that it may <i>become</i> the
+ * leader, in which case it will also transition to the active state.
+ */
+public class QueryState extends ProcessingState {
+
+ private static final Logger logger = LoggerFactory.getLogger(QueryState.class);
+
+ /**
+ * Hosts that have sent an "Identification" message. Always includes this host.
+ */
+ private final TreeSet<String> alive = new TreeSet<>();
+
+ /**
+ * {@code True} if we saw our own Identification method, {@code false} otherwise.
+ */
+ private boolean sawSelfIdent = false;
+
+ /**
+ * Constructor.
+ *
+ * @param mgr manager
+ */
+ public QueryState(PoolingManager mgr) {
+ // this host is the leader, until a better candidate identifies itself
+ super(mgr, mgr.getHost());
+
+ alive.add(getHost());
+ }
+
+ @Override
+ public void start() {
+ super.start();
+
+ // start identification timer
+ awaitIdentification();
+ }
+
+ /**
+ * Starts a timer to wait for all Identification messages to arrive.
+ */
+ private void awaitIdentification() {
+
+ /*
+ * Once we've waited long enough for all Identification messages to arrive, become
+ * the leader, assuming we should.
+ */
+
+ schedule(getProperties().getIdentificationMs(), () -> {
+
+ if (!sawSelfIdent) {
+ // didn't see our identification
+ logger.error("missed our own Ident message on topic {}", getTopic());
+ return missedHeartbeat();
+
+ } else if (isLeader()) {
+ // "this" host is the new leader
+ logger.info("this host is the new leader for topic {}", getTopic());
+ return becomeLeader(alive);
+
+ } else {
+ // not the leader - return to previous state
+ logger.info("no new leader on topic {}", getTopic());
+ return goActive(getAssignments());
+ }
+ });
+ }
+
+ @Override
+ public State goQuery() {
+ return null;
+ }
+
+ @Override
+ public State process(Identification msg) {
+
+ if (getHost().equals(msg.getSource())) {
+ logger.info("saw our own Ident message on topic {}", getTopic());
+ sawSelfIdent = true;
+
+ } else {
+ logger.info("received Ident message from {} on topic {}", msg.getSource(), getTopic());
+ recordInfo(msg.getSource(), msg.getAssignments());
+ }
+
+ return null;
+ }
+
+ /**
+ * If the message leader is better than the leader we have, then go active with it.
+ * Otherwise, simply treat it like an {@link Identification} message.
+ */
+ @Override
+ public State process(Leader msg) {
+ if (!isValid(msg)) {
+ return null;
+ }
+
+ String source = msg.getSource();
+ BucketAssignments asgn = msg.getAssignments();
+
+ // go active, if this has a leader that's the same or better than the one we have
+ if (source.compareTo(getLeader()) <= 0) {
+ logger.warn("leader with {} on topic {}", source, getTopic());
+ return goActive(asgn);
+ }
+
+ /*
+ * The message does not have an acceptable leader, but we'll still record its
+ * info.
+ */
+ logger.info("record leader info from {} on topic {}", source, getTopic());
+ recordInfo(source, asgn);
+
+ return null;
+ }
+
+ @Override
+ public State process(Offline msg) {
+ String host = msg.getSource();
+
+ if (host != null && !host.equals(getHost())) {
+ logger.warn("host {} offline on topic {}", host, getTopic());
+ alive.remove(host);
+ setLeader(alive.first());
+
+ } else {
+ logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
+ }
+
+ return null;
+ }
+
+ /**
+ * Records info from a message, adding the source host name to {@link #alive}, and
+ * updating the bucket assignments.
+ *
+ * @param source the message's source host
+ * @param assignments assignments, or {@code null}
+ */
+ private void recordInfo(String source, BucketAssignments assignments) {
+ // add this message's source host to "alive"
+ if (source != null) {
+ alive.add(source);
+ setLeader(alive.first());
+ }
+
+ if (assignments == null || assignments.getLeader() == null) {
+ return;
+ }
+
+ // record assignments, if we don't have any yet
+ BucketAssignments current = getAssignments();
+ if (current == null) {
+ logger.info("received initial assignments on topic {}", getTopic());
+ setAssignments(assignments);
+ return;
+ }
+
+ /*
+ * Record assignments, if the new assignments have a better (i.e., lesser) leader.
+ */
+ String curldr = current.getLeader();
+ if (curldr == null || assignments.getLeader().compareTo(curldr) < 0) {
+ logger.info("use new assignments from {} on topic {}", source, getTopic());
+ setAssignments(assignments);
+ }
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
new file mode 100644
index 00000000..73717d7c
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -0,0 +1,99 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import lombok.Getter;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The start state. Upon entry, a heart beat is generated and the event filter is changed
+ * to look for just that particular message. Once the message is seen, it goes into the
+ * {@link QueryState}.
+ */
+@Getter
+public class StartState extends State {
+
+ private static final Logger logger = LoggerFactory.getLogger(StartState.class);
+
+ /**
+ * Time stamp inserted into the heart beat message.
+ */
+ private long hbTimestampMs = System.currentTimeMillis();
+
+ /**
+ * Constructor.
+ *
+ * @param mgr pooling manager
+ */
+ public StartState(PoolingManager mgr) {
+ super(mgr);
+ }
+
+ @Override
+ public void start() {
+
+ super.start();
+
+ var hb = makeHeartbeat(hbTimestampMs);
+ publish(getHost(), hb);
+
+ /*
+ * heart beat generator
+ */
+ long genMs = getProperties().getInterHeartbeatMs();
+
+ scheduleWithFixedDelay(genMs, genMs, () -> {
+ publish(getHost(), hb);
+ return null;
+ });
+
+ /*
+ * my heart beat checker
+ */
+ schedule(getProperties().getStartHeartbeatMs(), () -> {
+ logger.error("missed heartbeat on topic {}", getTopic());
+ return internalTopicFailed();
+ });
+ }
+
+ /**
+ * Transitions to the query state if the heart beat originated from this host and its
+ * time stamp matches.
+ */
+ @Override
+ public State process(Heartbeat msg) {
+ if (msg.getTimestampMs() == hbTimestampMs && getHost().equals(msg.getSource())) {
+ // saw our own heart beat - transition to query state
+ logger.info("saw our own heartbeat on topic {}", getTopic());
+ publish(makeQuery());
+ return goQuery();
+
+ } else {
+ logger.info("ignored old heartbeat message from {} on topic {}", msg.getSource(), getTopic());
+ }
+
+ return null;
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java
new file mode 100644
index 00000000..e2cf9586
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -0,0 +1,373 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.onap.policy.drools.pooling.CancellableScheduledTask;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.PoolingProperties;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A state in the finite state machine.
+ *
+ * <p>A state may have several timers associated with it, which must be cancelled whenever
+ * the state is changed. Assumes that timers are not continuously added to the same state.
+ */
+public abstract class State {
+
+ private static final Logger logger = LoggerFactory.getLogger(State.class);
+
+ /**
+ * Host pool manager.
+ */
+ private final PoolingManager mgr;
+
+ /**
+ * Timers added by this state.
+ */
+ private final List<CancellableScheduledTask> timers = new LinkedList<>();
+
+ /**
+ * Constructor.
+ *
+ * @param mgr pooling manager
+ */
+ protected State(PoolingManager mgr) {
+ this.mgr = mgr;
+ }
+
+ /**
+ * Cancels the timers added by this state.
+ */
+ public final void cancelTimers() {
+ timers.forEach(CancellableScheduledTask::cancel);
+ }
+
+ /**
+ * Starts the state. The default method simply logs a message and returns.
+ */
+ public void start() {
+ logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
+ }
+
+ /**
+ * Transitions to the "start" state.
+ *
+ * @return the new state
+ */
+ public final State goStart() {
+ return mgr.goStart();
+ }
+
+ /**
+ * Transitions to the "query" state.
+ *
+ * @return the new state
+ */
+ public State goQuery() {
+ return mgr.goQuery();
+ }
+
+ /**
+ * Goes active with a new set of assignments.
+ *
+ * @param asgn new assignments
+ * @return the new state, either Active or Inactive, depending on whether or not this
+ * host has an assignment
+ */
+ protected State goActive(BucketAssignments asgn) {
+ startDistributing(asgn);
+
+ if (asgn != null && asgn.hasAssignment(getHost())) {
+ return mgr.goActive();
+
+ } else {
+ return goInactive();
+ }
+ }
+
+ /**
+ * Transitions to the "inactive" state.
+ *
+ * @return the new state
+ */
+ protected State goInactive() {
+ return mgr.goInactive();
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Heartbeat msg) {
+ logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Identification msg) {
+ logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method copies the assignments and then returns
+ * {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Leader msg) {
+ if (isValid(msg)) {
+ logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
+ startDistributing(msg.getAssignments());
+ }
+
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Offline msg) {
+ logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Query msg) {
+ logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
+ * Determines if a message is valid and did not originate from this host.
+ *
+ * @param msg message to be validated
+ * @return {@code true} if the message is valid, {@code false} otherwise
+ */
+ protected boolean isValid(Leader msg) {
+ BucketAssignments asgn = msg.getAssignments();
+ if (asgn == null) {
+ logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
+ return false;
+ }
+
+ // ignore Leader messages from ourself
+ String source = msg.getSource();
+ if (source == null || source.equals(getHost())) {
+ logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
+ return false;
+ }
+
+ // the new leader must equal the source
+ boolean result = source.equals(asgn.getLeader());
+
+ if (!result) {
+ logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
+ }
+
+ return result;
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected final void publish(Identification msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected final void publish(Leader msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected final void publish(Offline msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected final void publish(Query msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message on the specified channel.
+ *
+ * @param channel channel
+ * @param msg message to be published
+ */
+ protected final void publish(String channel, Heartbeat msg) {
+ mgr.publish(channel, msg);
+ }
+
+ /**
+ * Starts distributing messages using the specified bucket assignments.
+ *
+ * @param assignments assignments
+ */
+ protected final void startDistributing(BucketAssignments assignments) {
+ if (assignments != null) {
+ mgr.startDistributing(assignments);
+ }
+ }
+
+ /**
+ * Schedules a timer to fire after a delay.
+ *
+ * @param delayMs delay in ms
+ * @param task task
+ */
+ protected final void schedule(long delayMs, StateTimerTask task) {
+ timers.add(mgr.schedule(delayMs, task));
+ }
+
+ /**
+ * Schedules a timer to fire repeatedly.
+ *
+ * @param initialDelayMs initial delay ms
+ * @param delayMs delay ms
+ * @param task task
+ */
+ protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
+ }
+
+ /**
+ * Indicates that we failed to see our own heartbeat; must be a problem with the
+ * internal topic. Assumes the problem is temporary and continues to use the current
+ * bucket assignments.
+ *
+ * @return a new {@link StartState}
+ */
+ protected final State missedHeartbeat() {
+ publish(makeOffline());
+
+ return mgr.goStart();
+ }
+
+ /**
+ * Indicates that the internal topic failed; this should only be invoked from the
+ * StartState. Discards bucket assignments and begins processing everything locally.
+ *
+ * @return a new {@link InactiveState}
+ */
+ protected final State internalTopicFailed() {
+ publish(makeOffline());
+ mgr.startDistributing(null);
+
+ return mgr.goInactive();
+ }
+
+ /**
+ * Makes a heart beat message.
+ *
+ * @param timestampMs time, in milliseconds, associated with the message
+ *
+ * @return a new message
+ */
+ protected final Heartbeat makeHeartbeat(long timestampMs) {
+ return new Heartbeat(getHost(), timestampMs);
+ }
+
+ /**
+ * Makes an Identification message.
+ *
+ * @return a new message
+ */
+ protected Identification makeIdentification() {
+ return new Identification(getHost(), getAssignments());
+ }
+
+ /**
+ * Makes an "offline" message.
+ *
+ * @return a new message
+ */
+ protected final Offline makeOffline() {
+ return new Offline(getHost());
+ }
+
+ /**
+ * Makes a query message.
+ *
+ * @return a new message
+ */
+ protected final Query makeQuery() {
+ return new Query(getHost());
+ }
+
+ public final BucketAssignments getAssignments() {
+ return mgr.getAssignments();
+ }
+
+ public final String getHost() {
+ return mgr.getHost();
+ }
+
+ public final String getTopic() {
+ return mgr.getTopic();
+ }
+
+ public final PoolingProperties getProperties() {
+ return mgr.getProperties();
+ }
+}
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java
new file mode 100644
index 00000000..892a1767
--- /dev/null
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+/**
+ * Task to be executed when a timer fires within a {@link State}.
+ */
+@FunctionalInterface
+public interface StateTimerTask {
+
+ /**
+ * Fires the timer.
+ *
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ State fire();
+
+}