diff options
Diffstat (limited to 'feature-pooling-messages/src/main')
29 files changed, 4010 insertions, 0 deletions
diff --git a/feature-pooling-messages/src/main/feature/config/feature-pooling-messages.properties b/feature-pooling-messages/src/main/feature/config/feature-pooling-messages.properties new file mode 100644 index 00000000..925d1698 --- /dev/null +++ b/feature-pooling-messages/src/main/feature/config/feature-pooling-messages.properties @@ -0,0 +1,89 @@ +### +# ============LICENSE_START======================================================= +# feature-pooling-messages +# ================================================================================ +# Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Modifications Copyright (C) 2024 Nordix Foundation. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +# In general, the feature-specific properties begin with "pooling", +# and they may be made specific to a controller by prepending with +# "pooling.<controller-name>", instead. +# +# The available properties and their default values are shown below. + +# Whether the feature is enabled. +#pooling.enabled=false + +# The internal kafka topic used by a controller. Note: the controller +# name is required for this property. +#pooling.<controller-name>.topic = + +# Maximum number of events to retain in the queue while a new host waits +# to be assigned work. +#pooling.offline.queue.limit=1000 + +# Maximum age, in milliseconds, of events to be retained in the queue. +# Events older than this are discarded. +#pooling.offline.queue.age.milliseconds=60000 + +# Time, in milliseconds, to wait for an "Offline" message to be published +# to topic manager before the connection may be closed. +#pooling.offline.publish.wait.milliseconds=3000 + +# Time, in milliseconds, to wait for this host's initial heart beat. This +# is used to verify connectivity to the internal topic. +#pooling.start.heartbeat.milliseconds=100000 + +# Time, in milliseconds, to wait before attempting to reactivate this +# host when it was not assigned any work. +#pooling.reactivate.milliseconds=50000 + +# Time, in milliseconds, to wait for other hosts to identify themselves +# when this host is started. +#pooling.identification.milliseconds=50000 + +# Time, in milliseconds, to wait for heart beats from this host, or its +# predecessor, during the active state. +#pooling.active.heartbeat.milliseconds=50000 + +# Time, in milliseconds, to wait between heart beat generations. +#pooling.inter.heartbeat.milliseconds=15000 + +# Topic used for inter-host communication for a particular controller +# pooling.<controller-name>.topic=XXX + +# Each controller that is enabled should have its own topic and the +# corresponding ${topicManager}.xxx properties (using kafka as default). +# However, for now, just assume that the usecases features will not both +# be enabled at the same time. + +pooling.usecases.enabled=true +pooling.usecases.topic=${env:POOLING_TOPIC} + +# the list of sources and sinks should be identical +kafka.source.topics=POOLING_TOPIC +kafka.sink.topics=POOLING_TOPIC + +kafka.source.topics.POOLING_TOPIC.servers=${env:KAFKA_SERVERS} +kafka.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC} +kafka.source.topics.POOLING_TOPIC.apiKey= +kafka.source.topics.POOLING_TOPIC.apiSecret= + +kafka.sink.topics.POOLING_TOPIC.servers=${env:kafka_SERVERS} +kafka.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC} +kafka.sink.topics.POOLING_TOPIC.apiKey= +kafka.sink.topics.POOLING_TOPIC.apiSecret= diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java new file mode 100644 index 00000000..4e9112e6 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +/** + * A scheduled task that can be cancelled. + */ +@FunctionalInterface +public interface CancellableScheduledTask { + + /** + * Cancels the scheduled task. + */ + void cancel(); +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java new file mode 100644 index 00000000..6411dd81 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -0,0 +1,397 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.utils.properties.SpecProperties; +import org.onap.policy.common.utils.properties.exception.PropertyException; +import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.features.DroolsControllerFeatureApi; +import org.onap.policy.drools.features.PolicyControllerFeatureApi; +import org.onap.policy.drools.features.PolicyEngineFeatureApi; +import org.onap.policy.drools.persistence.SystemPersistenceConstants; +import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyControllerConstants; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.util.FeatureEnabledChecker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Controller/session pooling. Multiple hosts may be launched, all servicing the same + * controllers/sessions. When this feature is enabled, the requests are divided across the different + * hosts, instead of all running on a single, active host. + * + * <p>With each controller, there is an + * associated DMaaP topic that is used for internal communication between the different hosts + * serving the controller. + */ +public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi { + + private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class); + + /** + * ID of this host. + */ + @Getter + private final String host; + + /** + * Entire set of feature properties, including those specific to various controllers. + */ + private Properties featProps = null; + + /** + * Maps a controller name to its associated manager. + */ + private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107); + + /** + * Decremented each time a manager enters the Active state. Used by junit tests. + */ + @Getter(AccessLevel.PROTECTED) + private final CountDownLatch activeLatch = new CountDownLatch(1); + + /** + * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is + * called later. As multiple threads can be active within the methods at the same + * time, we must keep this in thread local storage. + */ + private ThreadLocal<String> offerTopics = new ThreadLocal<>(); + + /** + * Constructor. + */ + public PoolingFeature() { + super(); + + this.host = UUID.randomUUID().toString(); + } + + @Override + public int getSequenceNumber() { + return 0; + } + + @Override + public boolean beforeStart(PolicyEngine engine) { + logger.info("initializing {}", PoolingProperties.FEATURE_NAME); + featProps = getProperties(PoolingProperties.FEATURE_NAME); + + // remove any generic pooling topic - always use controller-specific property + featProps.remove(PoolingProperties.POOLING_TOPIC); + + initTopicSources(featProps); + initTopicSinks(featProps); + + return false; + } + + @Override + public boolean beforeStart(PolicyController controller) { + return doManager(controller, mgr -> { + mgr.beforeStart(); + return false; + }); + } + + /** + * Adds the controller and a new pooling manager to {@link #ctlr2pool}. + * + * @throws PoolingFeatureRtException if an error occurs + */ + @Override + public boolean afterCreate(PolicyController controller) { + + if (featProps == null) { + logger.error("pooling feature properties have not been loaded"); + throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties")); + } + + String name = controller.getName(); + + var specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps); + + if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) { + try { + // get & validate the properties + var props = new PoolingProperties(name, featProps); + + logger.info("pooling enabled for {}", name); + ctlr2pool.computeIfAbsent(name, xxx -> makeManager(host, controller, props, activeLatch)); + + } catch (PropertyException e) { + logger.error("pooling disabled due to exception for {}", name); + throw new PoolingFeatureRtException(e); + } + + } else { + logger.info("pooling disabled for {}", name); + } + + + return false; + } + + @Override + public boolean afterStart(PolicyController controller) { + return doManager(controller, mgr -> { + mgr.afterStart(); + return false; + }); + } + + @Override + public boolean beforeStop(PolicyController controller) { + return doManager(controller, mgr -> { + mgr.beforeStop(); + return false; + }); + } + + @Override + public boolean afterStop(PolicyController controller) { + return doManager(controller, mgr -> { + mgr.afterStop(); + return false; + }); + } + + @Override + public boolean afterShutdown(PolicyController controller) { + return commonShutdown(controller); + } + + @Override + public boolean afterHalt(PolicyController controller) { + return commonShutdown(controller); + } + + private boolean commonShutdown(PolicyController controller) { + deleteManager(controller); + return false; + } + + @Override + public boolean beforeLock(PolicyController controller) { + return doManager(controller, mgr -> { + mgr.beforeLock(); + return false; + }); + } + + @Override + public boolean afterUnlock(PolicyController controller) { + return doManager(controller, mgr -> { + mgr.afterUnlock(); + return false; + }); + } + + @Override + public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) { + /* + * As this is invoked a lot, we'll directly call the manager's method instead of using the + * functional interface via doManager(). + */ + PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); + if (mgr == null) { + return false; + } + + if (mgr.beforeOffer(topic2, event)) { + return true; + } + + offerTopics.set(topic2); + return false; + } + + @Override + public boolean beforeInsert(DroolsController droolsController, Object fact) { + + String topic = offerTopics.get(); + if (topic == null) { + logger.warn("missing arguments for feature-pooling-messages in beforeInsert"); + return false; + } + + PolicyController controller; + try { + controller = getController(droolsController); + + } catch (IllegalArgumentException | IllegalStateException e) { + logger.warn("cannot get controller for {} {}", droolsController.getGroupId(), + droolsController.getArtifactId(), e); + return false; + } + + + if (controller == null) { + logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(), + droolsController.getArtifactId()); + return false; + } + + /* + * As this is invoked a lot, we'll directly call the manager's method instead of using the + * functional interface via doManager(). + */ + PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); + if (mgr == null) { + return false; + } + + return mgr.beforeInsert(topic, fact); + } + + @Override + public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event, + boolean success) { + + // clear any stored arguments + offerTopics.remove(); + + return false; + } + + /** + * Executes a function using the manager associated with the controller. Catches any exceptions + * from the function and re-throws it as a runtime exception. + * + * @param controller controller + * @param func function to be executed + * @return {@code true} if the function handled the request, {@code false} otherwise + * @throws PoolingFeatureRtException if an error occurs + */ + private boolean doManager(PolicyController controller, MgrFunc func) { + PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); + if (mgr == null) { + return false; + } + + try { + return func.apply(mgr); + + } catch (PoolingFeatureException e) { + throw new PoolingFeatureRtException(e); + } + } + + /** + * Deletes the manager associated with a controller. + * + * @param controller controller + * @throws PoolingFeatureRtException if an error occurs + */ + private void deleteManager(PolicyController controller) { + + String name = controller.getName(); + logger.info("remove feature-pooling-messages manager for {}", name); + + ctlr2pool.remove(name); + } + + /** + * Function that operates on a manager. + */ + @FunctionalInterface + private static interface MgrFunc { + + /** + * Apply. + * + * @param mgr manager + * @return {@code true} if the request was handled by the manager, {@code false} otherwise + * @throws PoolingFeatureException feature exception + */ + boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException; + } + + /* + * The remaining methods may be overridden by junit tests. + */ + + /** + * Get properties. + * + * @param featName feature name + * @return the properties for the specified feature + */ + protected Properties getProperties(String featName) { + return SystemPersistenceConstants.getManager().getProperties(featName); + } + + /** + * Makes a pooling manager for a controller. + * + * @param host name/uuid of this host + * @param controller controller + * @param props properties to use to configure the manager + * @param activeLatch decremented when the manager goes Active + * @return a new pooling manager + */ + protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { + return new PoolingManagerImpl(host, controller, props, activeLatch); + } + + /** + * Gets the policy controller associated with a drools controller. + * + * @param droolsController drools controller + * @return the policy controller associated with a drools controller + */ + protected PolicyController getController(DroolsController droolsController) { + return PolicyControllerConstants.getFactory().get(droolsController); + } + + /** + * Initializes the topic sources. + * + * @param props properties used to configure the topics + * @return the topic sources + */ + protected List<TopicSource> initTopicSources(Properties props) { + return TopicEndpointManager.getManager().addTopicSources(props); + } + + /** + * Initializes the topic sinks. + * + * @param props properties used to configure the topics + * @return the topic sinks + */ + protected List<TopicSink> initTopicSinks(Properties props) { + return TopicEndpointManager.getManager().addTopicSinks(props); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java new file mode 100644 index 00000000..5d7b9f76 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import java.io.Serial; + +/** + * Exception thrown by the pooling feature. + */ +public class PoolingFeatureException extends Exception { + @Serial + private static final long serialVersionUID = 1L; + + public PoolingFeatureException() { + super(); + } + + public PoolingFeatureException(String message) { + super(message); + } + + public PoolingFeatureException(Throwable cause) { + super(cause); + } + + public PoolingFeatureException(String message, Throwable cause) { + super(message, cause); + } + + public PoolingFeatureException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java new file mode 100644 index 00000000..5d0a2755 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import java.io.Serial; + +/** + * A runtime exception thrown by the pooling feature. + */ +public class PoolingFeatureRtException extends RuntimeException { + @Serial + private static final long serialVersionUID = 1L; + + public PoolingFeatureRtException() { + super(); + } + + public PoolingFeatureRtException(String message) { + super(message); + } + + public PoolingFeatureRtException(Throwable cause) { + super(cause); + } + + public PoolingFeatureRtException(String message, Throwable cause) { + super(message, cause); + } + + public PoolingFeatureRtException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java new file mode 100644 index 00000000..5e358e61 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java @@ -0,0 +1,133 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.state.State; +import org.onap.policy.drools.pooling.state.StateTimerTask; + +/** + * Pooling manager for a single PolicyController. + */ +public interface PoolingManager { + + /** + * Gets the properties used to configure the manager. + * + * @return pooling properties + */ + PoolingProperties getProperties(); + + /** + * Gets the host id. + * + * @return the host id + */ + String getHost(); + + /** + * Gets the name of the internal DMaaP topic used by this manager to communicate with + * its other hosts. + * + * @return the name of the internal DMaaP topic + */ + String getTopic(); + + /** + * Starts distributing requests according to the given bucket assignments. + * + * @param assignments must <i>not</i> be {@code null} + */ + void startDistributing(BucketAssignments assignments); + + /** + * Gets the current bucket assignments. + * + * @return the current bucket assignments, or {@code null} if no assignments have been + * made + */ + BucketAssignments getAssignments(); + + /** + * Publishes a message to the internal topic on the administrative channel. + * + * @param msg message to be published + */ + void publishAdmin(Message msg); + + /** + * Publishes a message to the internal topic on a particular channel. + * + * @param channel channel on which the message should be published + * @param msg message to be published + */ + void publish(String channel, Message msg); + + /** + * Schedules a timer to fire after a delay. + * + * @param delayMs delay, in milliseconds + * @param task task + * @return a new scheduled task + */ + CancellableScheduledTask schedule(long delayMs, StateTimerTask task); + + /** + * Schedules a timer to fire repeatedly. + * + * @param initialDelayMs initial delay, in milliseconds + * @param delayMs delay, in milliseconds + * @param task task + * @return a new scheduled task + */ + CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task); + + /** + * Transitions to the "start" state. + * + * @return the new state + */ + State goStart(); + + /** + * Transitions to the "query" state. + * + * @return the new state + */ + State goQuery(); + + /** + * Transitions to the "active" state. + * + * @return the new state + */ + State goActive(); + + /** + * Transitions to the "inactive" state. + * + * @return the new state + */ + State goInactive(); + +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java new file mode 100644 index 00000000..7c0436eb --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -0,0 +1,647 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import com.google.gson.JsonParseException; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.state.ActiveState; +import org.onap.policy.drools.pooling.state.IdleState; +import org.onap.policy.drools.pooling.state.InactiveState; +import org.onap.policy.drools.pooling.state.QueryState; +import org.onap.policy.drools.pooling.state.StartState; +import org.onap.policy.drools.pooling.state.State; +import org.onap.policy.drools.pooling.state.StateTimerTask; +import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants; +import org.onap.policy.drools.system.PolicyController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of a {@link PoolingManager}. Until bucket assignments have been made, + * events coming from external topics are saved in a queue for later processing. Once + * assignments are made, the saved events are processed. In addition, while the controller + * is locked, events are still forwarded to other hosts and bucket assignments are still + * updated, based on any {@link Leader} messages that it receives. + */ +public class PoolingManagerImpl implements PoolingManager, TopicListener { + + private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class); + + /** + * Maximum number of times a message can be forwarded. + */ + public static final int MAX_HOPS = 5; + + /** + * ID of this host. + */ + @Getter + private final String host; + + /** + * Properties with which this was configured. + */ + @Getter + private final PoolingProperties properties; + + /** + * Associated controller. + */ + private final PolicyController controller; + + /** + * Decremented each time the manager enters the Active state. Used by junit tests. + */ + private final CountDownLatch activeLatch; + + /** + * Used to encode & decode request objects received from & sent to a rule engine. + */ + private final Serializer serializer; + + /** + * Internal DMaaP topic used by this controller. + */ + @Getter + private final String topic; + + /** + * Manager for the internal DMaaP topic. + */ + private final TopicMessageManager topicMessageManager; + + /** + * Lock used while updating {@link #current}. In general, public methods must use + * this, while private methods assume the lock is already held. + */ + private final Object curLocker = new Object(); + + /** + * Current state. + * + * <p>This uses a finite state machine, wherein the state object contains all of the data + * relevant to that state. Each state object has a process() method, specific to each + * type of {@link Message} subclass. The method returns the next state object, or + * {@code null} if the state is to remain the same. + */ + private State current; + + /** + * Current bucket assignments or {@code null}. + */ + @Getter + private BucketAssignments assignments = null; + + /** + * Pool used to execute timers. + */ + private ScheduledThreadPoolExecutor scheduler = null; + + /** + * Constructs the manager, initializing all the data structures. + * + * @param host name/uuid of this host + * @param controller controller with which this is associated + * @param props feature properties specific to the controller + * @param activeLatch latch to be decremented each time the manager enters the Active + * state + */ + public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { + this.host = host; + this.controller = controller; + this.properties = props; + this.activeLatch = activeLatch; + + try { + this.serializer = new Serializer(); + this.topic = props.getPoolingTopic(); + this.topicMessageManager = makeTopicMessagesManager(props.getPoolingTopic()); + this.current = new IdleState(this); + + logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic); + + } catch (ClassCastException e) { + logger.error("not a topic listener, controller {}", controller.getName()); + throw new PoolingFeatureRtException(e); + + } catch (PoolingFeatureException e) { + logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName()); + throw new PoolingFeatureRtException(e); + } + } + + /** + * Should only be used by junit tests. + * + * @return the current state + */ + protected State getCurrent() { + synchronized (curLocker) { + return current; + } + } + + /** + * Indicates that the controller is about to start. Starts the publisher for the + * internal topic, and creates a thread pool for the timers. + */ + public void beforeStart() { + synchronized (curLocker) { + if (scheduler == null) { + topicMessageManager.startPublisher(); + + logger.debug("make scheduler thread for topic {}", getTopic()); + scheduler = makeScheduler(); + + /* + * Only a handful of timers at any moment, thus we can afford to take the + * time to remove them when they're cancelled. + */ + scheduler.setRemoveOnCancelPolicy(true); + scheduler.setMaximumPoolSize(1); + scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + } + } + } + + /** + * Indicates that the controller has successfully started. Starts the consumer for the + * internal topic, enters the {@link StartState}, and sets the filter for the initial + * state. + */ + public void afterStart() { + synchronized (curLocker) { + if (current instanceof IdleState) { + topicMessageManager.startConsumer(this); + changeState(new StartState(this)); + } + } + } + + /** + * Indicates that the controller is about to stop. Stops the consumer, the scheduler, + * and the current state. + */ + public void beforeStop() { + ScheduledThreadPoolExecutor sched; + + synchronized (curLocker) { + sched = scheduler; + scheduler = null; + + if (!(current instanceof IdleState)) { + changeState(new IdleState(this)); + topicMessageManager.stopConsumer(this); + publishAdmin(new Offline(getHost())); + } + + assignments = null; + } + + if (sched != null) { + logger.debug("stop scheduler for topic {}", getTopic()); + sched.shutdownNow(); + } + } + + /** + * Indicates that the controller has stopped. Stops the publisher and logs a warning + * if any events are still in the queue. + */ + public void afterStop() { + synchronized (curLocker) { + /* + * stop the publisher, but allow time for any Offline message to be + * transmitted + */ + topicMessageManager.stopPublisher(properties.getOfflinePubWaitMs()); + } + } + + /** + * Indicates that the controller is about to be locked. Enters the idle state, as all + * it will be doing is forwarding messages. + */ + public void beforeLock() { + logger.info("locking manager for topic {}", getTopic()); + + synchronized (curLocker) { + changeState(new IdleState(this)); + } + } + + /** + * Indicates that the controller has been unlocked. Enters the start state, if the + * controller is running. + */ + public void afterUnlock() { + logger.info("unlocking manager for topic {}", getTopic()); + + synchronized (curLocker) { + if (controller.isAlive() && current instanceof IdleState && scheduler != null) { + changeState(new StartState(this)); + } + } + } + + /** + * Changes the finite state machine to a new state, provided the new state is not + * {@code null}. + * + * @param newState new state, or {@code null} if to remain unchanged + */ + private void changeState(State newState) { + if (newState != null) { + current.cancelTimers(); + current = newState; + + newState.start(); + } + } + + @Override + public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return () -> fut.cancel(false); + } + + @Override + public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, + TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return () -> fut.cancel(false); + } + + @Override + public void publishAdmin(Message msg) { + publish(Message.ADMIN, msg); + } + + @Override + public void publish(String channel, Message msg) { + logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic()); + + msg.setChannel(channel); + + try { + // ensure it's valid before we send it + msg.checkValidity(); + + String txt = serializer.encodeMsg(msg); + topicMessageManager.publish(txt); + + } catch (JsonParseException e) { + logger.error("failed to serialize message for topic {} channel {}", topic, channel, e); + + } catch (PoolingFeatureException e) { + logger.error("failed to publish message for topic {} channel {}", topic, channel, e); + } + } + + /** + * Handles an event from the internal topic. + * + * @param commType comm infrastructure + * @param topic2 topic + * @param event event + */ + @Override + public void onTopicEvent(CommInfrastructure commType, String topic2, String event) { + + if (event == null) { + logger.error("null event on topic {}", topic); + return; + } + + synchronized (curLocker) { + // it's on the internal topic + handleInternal(event); + } + } + + /** + * Called by the PolicyController before it offers the event to the DroolsController. + * If the controller is locked, then it isn't processing events. However, they still + * need to be forwarded, thus in that case, they are decoded and forwarded. + * + * <p>On the other hand, if the controller is not locked, then we just return immediately + * and let {@link #beforeInsert(String, Object) beforeInsert()} handle + * it instead, as it already has the decoded message. + * + * @param topic2 topic + * @param event event + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker + */ + public boolean beforeOffer(String topic2, String event) { + + if (!controller.isLocked()) { + // we should NOT intercept this message - let the invoker handle it + return false; + } + + return handleExternal(topic2, decodeEvent(topic2, event)); + } + + /** + * Called by the DroolsController before it inserts the event into the rule engine. + * + * @param topic2 topic + * @param event event, as an object + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker + */ + public boolean beforeInsert(String topic2, Object event) { + return handleExternal(topic2, event); + } + + /** + * Handles an event from an external topic. + * + * @param topic2 topic + * @param event event, as an object, or {@code null} if it cannot be decoded + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker + */ + private boolean handleExternal(String topic2, Object event) { + if (event == null) { + // no event - let the invoker handle it + return false; + } + + synchronized (curLocker) { + return handleExternal(topic2, event, event.hashCode()); + } + } + + /** + * Handles an event from an external topic. + * + * @param topic2 topic + * @param event event, as an object + * @param eventHashCode event's hash code + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it + */ + private boolean handleExternal(String topic2, Object event, int eventHashCode) { + if (assignments == null) { + // no bucket assignments yet - handle locally + logger.info("handle event locally for request {}", event); + + // we did NOT consume the event + return false; + + } else { + return handleEvent(topic2, event, eventHashCode); + } + } + + /** + * Handles a {@link Forward} event, possibly forwarding it again. + * + * @param topic2 topic + * @param event event, as an object + * @param eventHashCode event's hash code + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it + */ + private boolean handleEvent(String topic2, Object event, int eventHashCode) { + String target = assignments.getAssignedHost(eventHashCode); + + if (target == null) { + /* + * This bucket has no assignment - just discard the event + */ + logger.warn("discarded event for unassigned bucket from topic {}", topic2); + return true; + } + + if (target.equals(host)) { + /* + * Message belongs to this host - allow the controller to handle it. + */ + logger.info("handle local event for request {} from topic {}", event, topic2); + return false; + } + + // not our message, consume the event + logger.warn("discarded event for host {} from topic {}", target, topic2); + return true; + } + + /** + * Decodes an event from a String into an event Object. + * + * @param topic2 topic + * @param event event + * @return the decoded event object, or {@code null} if it can't be decoded + */ + private Object decodeEvent(String topic2, String event) { + DroolsController drools = controller.getDrools(); + + // check if this topic has a decoder + + if (!canDecodeEvent(drools, topic2)) { + + logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(), + drools.getArtifactId()); + return null; + } + + // decode + + try { + return decodeEventWrapper(drools, topic2, event); + + } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) { + logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e); + return null; + } + } + + /** + * Handles an event from the internal topic. This uses reflection to identify the + * appropriate process() method to invoke, based on the type of Message that was + * decoded. + * + * @param event the serialized {@link Message} read from the internal topic + */ + private void handleInternal(String event) { + Class<?> clazz = null; + + try { + Message msg = serializer.decodeMsg(event); + + // get the class BEFORE checking the validity + clazz = msg.getClass(); + + msg.checkValidity(); + + var meth = current.getClass().getMethod("process", msg.getClass()); + changeState((State) meth.invoke(current, msg)); + + } catch (JsonParseException e) { + logger.warn("failed to decode message for topic {}", topic, e); + + } catch (NoSuchMethodException | SecurityException e) { + logger.error("no processor for message {} for topic {}", clazz, topic, e); + + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException + | PoolingFeatureException e) { + logger.error("failed to process message {} for topic {}", clazz, topic, e); + } + } + + @Override + public void startDistributing(BucketAssignments asgn) { + synchronized (curLocker) { + int sz = (asgn == null ? 0 : asgn.getAllHosts().size()); + logger.info("new assignments for {} hosts on topic {}", sz, getTopic()); + assignments = asgn; + } + } + + @Override + public State goStart() { + return new StartState(this); + } + + @Override + public State goQuery() { + return new QueryState(this); + } + + @Override + public State goActive() { + activeLatch.countDown(); + return new ActiveState(this); + } + + @Override + public State goInactive() { + return new InactiveState(this); + } + + /** + * Action to run a timer task. Only runs the task if the machine is still in the state + * that it was in when the timer was created. + */ + private class TimerAction implements Runnable { + + /** + * State of the machine when the timer was created. + */ + private State origState; + + /** + * Task to be executed. + */ + private StateTimerTask task; + + /** + * Constructor. + * + * @param task task to execute when this timer runs + */ + public TimerAction(StateTimerTask task) { + this.origState = current; + this.task = task; + } + + @Override + public void run() { + synchronized (curLocker) { + if (current == origState) { + changeState(task.fire()); + } + } + } + } + + /** + * Creates a DMaaP manager. + * + * @param topic name of the internal DMaaP topic + * @return a new topic messages manager + * @throws PoolingFeatureException if an error occurs + */ + protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException { + return new TopicMessageManager(topic); + } + + /** + * Creates a scheduled thread pool. + * + * @return a new scheduled thread pool + */ + protected ScheduledThreadPoolExecutor makeScheduler() { + return new ScheduledThreadPoolExecutor(1); + } + + /** + * Determines if the event can be decoded. + * + * @param drools drools controller + * @param topic topic on which the event was received + * @return {@code true} if the event can be decoded, {@code false} otherwise + */ + protected boolean canDecodeEvent(DroolsController drools, String topic) { + return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), + topic); + } + + /** + * Decodes the event. + * + * @param drools drools controller + * @param topic topic on which the event was received + * @param event event text to be decoded + * @return the decoded event + * @throws IllegalArgumentException illegal argument + * @throws UnsupportedOperationException unsupported operation + * @throws IllegalStateException illegal state + */ + protected Object decodeEventWrapper(DroolsController drools, String topic, String event) { + return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic, + event); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java new file mode 100644 index 00000000..fd1c3d3d --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java @@ -0,0 +1,150 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import java.util.Properties; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.BeanConfigurator; +import org.onap.policy.common.utils.properties.Property; +import org.onap.policy.common.utils.properties.SpecProperties; +import org.onap.policy.common.utils.properties.exception.PropertyException; + +/** + * Properties used by the pooling feature, specific to a controller. + */ +@Getter +@Setter +public class PoolingProperties { + + /** + * The feature name, used to retrieve properties. + */ + public static final String FEATURE_NAME = "feature-pooling-messages"; + + /** + * Feature properties all begin with this prefix. + */ + public static final String PREFIX = "pooling."; + + public static final String FEATURE_ENABLED = PREFIX + "enabled"; + public static final String POOLING_TOPIC = PREFIX + "topic"; + public static final String OFFLINE_LIMIT = PREFIX + "offline.queue.limit"; + public static final String OFFLINE_AGE_MS = PREFIX + "offline.queue.age.milliseconds"; + public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "offline.publish.wait.milliseconds"; + public static final String START_HEARTBEAT_MS = PREFIX + "start.heartbeat.milliseconds"; + public static final String REACTIVATE_MS = PREFIX + "reactivate.milliseconds"; + public static final String IDENTIFICATION_MS = PREFIX + "identification.milliseconds"; + public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "active.heartbeat.milliseconds"; + public static final String INTER_HEARTBEAT_MS = PREFIX + "inter.heartbeat.milliseconds"; + + /** + * Type of item that the extractors will be extracting. + */ + public static final String EXTRACTOR_TYPE = "requestId"; + + /** + * Prefix for extractor properties. + */ + public static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE; + + /** + * Properties from which this was constructed. + */ + private Properties source; + + /** + * Topic used for inter-host communication. + */ + @Property(name = POOLING_TOPIC) + private String poolingTopic; + + /** + * Maximum number of events to retain in the queue while waiting for + * buckets to be assigned. + */ + @Property(name = OFFLINE_LIMIT, defaultValue = "1000") + private int offlineLimit; + + /** + * Maximum age, in milliseconds, of events to be retained in the queue. + * Events older than this are discarded. + */ + @Property(name = OFFLINE_AGE_MS, defaultValue = "60000") + private long offlineAgeMs; + + /** + * Time, in milliseconds, to wait for an "Offline" message to be published + * to topic. + */ + @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000") + private long offlinePubWaitMs; + + /** + * Time, in milliseconds, to wait for this host's heart beat during the + * start-up state. + */ + @Property(name = START_HEARTBEAT_MS, defaultValue = "100000") + private long startHeartbeatMs; + + /** + * Time, in milliseconds, to wait before attempting to reactivate this + * host when it has no bucket assignments. + */ + @Property(name = REACTIVATE_MS, defaultValue = "50000") + private long reactivateMs; + + /** + * Time, in milliseconds, to wait for all Identification messages to + * arrive during the query state. + */ + @Property(name = IDENTIFICATION_MS, defaultValue = "50000") + private long identificationMs; + + /** + * Time, in milliseconds, to wait for heart beats from this host, or its + * predecessor, during the active state. + */ + @Property(name = ACTIVE_HEARTBEAT_MS, defaultValue = "50000") + private long activeHeartbeatMs; + + /** + * Time, in milliseconds, to wait between heart beat generations during + * the active and start-up states. + */ + @Property(name = INTER_HEARTBEAT_MS, defaultValue = "15000") + private long interHeartbeatMs; + + /** + * Constructor. + * + * @param controllerName the name of the controller + * @param props set of properties used to configure this + * @throws PropertyException if an error occurs + * + */ + public PoolingProperties(String controllerName, Properties props) throws PropertyException { + source = props; + + new BeanConfigurator().configureFromProperties(this, new SpecProperties(PREFIX, controllerName, props)); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/Serializer.java new file mode 100644 index 00000000..2894b1da --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/Serializer.java @@ -0,0 +1,124 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; +import java.util.HashMap; +import java.util.Map; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.message.Query; + +/** + * Serialization helper functions. + */ +public class Serializer { + + /** + * The message type is stored in fields of this name within the JSON. + */ + private static final String TYPE_FIELD = "type"; + + /** + * Used to encode & decode JSON messages sent & received, respectively, on the + * internal topic. + */ + private final Gson gson = new Gson(); + + /** + * Maps a message subclass to its type. + */ + private static final Map<Class<? extends Message>, String> class2type = new HashMap<>(); + + /** + * Maps a message type to the appropriate subclass. + */ + private static final Map<String, Class<? extends Message>> type2class = new HashMap<>(); + + static { + class2type.put(Heartbeat.class, "heartbeat"); + class2type.put(Identification.class, "identification"); + class2type.put(Leader.class, "leader"); + class2type.put(Offline.class, "offline"); + class2type.put(Query.class, "query"); + + class2type.forEach((clazz, type) -> type2class.put(type, clazz)); + } + + /** + * Encodes a filter. + * + * @param filter filter to be encoded + * @return the filter, serialized as a JSON string + */ + public String encodeFilter(Map<String, Object> filter) { + return gson.toJson(filter); + } + + /** + * Encodes a message. + * + * @param msg message to be encoded + * @return the message, serialized as a JSON string + */ + public String encodeMsg(Message msg) { + JsonElement jsonEl = gson.toJsonTree(msg); + + String type = class2type.get(msg.getClass()); + if (type == null) { + throw new JsonParseException("cannot serialize " + msg.getClass()); + } + + jsonEl.getAsJsonObject().addProperty(TYPE_FIELD, type); + + return gson.toJson(jsonEl); + } + + /** + * Decodes a JSON string into a Message. + * + * @param msg JSON string representing the message + * @return the message + */ + public Message decodeMsg(String msg) { + JsonElement jsonEl = gson.fromJson(msg, JsonElement.class); + + JsonElement typeEl = jsonEl.getAsJsonObject().get(TYPE_FIELD); + if (typeEl == null) { + throw new JsonParseException("cannot deserialize " + Message.class + + " because it does not contain a field named " + TYPE_FIELD); + + } + + Class<? extends Message> clazz = type2class.get(typeEl.getAsString()); + if (clazz == null) { + throw new JsonParseException("cannot deserialize " + typeEl); + } + + return gson.fromJson(jsonEl, clazz); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java new file mode 100644 index 00000000..4c9b3f34 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java @@ -0,0 +1,233 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import java.util.List; +import lombok.Getter; +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the internal topic. Assumes all topics are managed by + * {@link TopicEndpoint}. + */ +public class TopicMessageManager { + + private static final Logger logger = LoggerFactory.getLogger(TopicMessageManager.class); + + /** + * Name of the topic. + */ + @Getter + private final String topic; + + /** + * Topic source whose filter is to be manipulated. + */ + private final TopicSource topicSource; + + /** + * Where to publish messages. + */ + private final TopicSink topicSink; + + /** + * {@code True} if the consumer is running, {@code false} otherwise. + */ + private boolean consuming = false; + + /** + * {@code True} if the publisher is running, {@code false} otherwise. + */ + private boolean publishing = false; + + /** + * Constructs the manager, but does not start the source or sink. + * + * @param topic name of the internal topic + * @throws PoolingFeatureException if an error occurs + */ + public TopicMessageManager(String topic) throws PoolingFeatureException { + + logger.info("initializing bus for topic {}", topic); + + try { + this.topic = topic; + + this.topicSource = findTopicSource(); + this.topicSink = findTopicSink(); + + } catch (IllegalArgumentException e) { + logger.error("failed to attach to topic {}", topic); + throw new PoolingFeatureException(e); + } + } + + /** + * Finds the topic source associated with the internal topic. + * + * @return the topic source + * @throws PoolingFeatureException if the source doesn't exist or is not filterable + */ + private TopicSource findTopicSource() throws PoolingFeatureException { + for (TopicSource src : getTopicSources()) { + if (topic.equals(src.getTopic())) { + return src; + } + } + + throw new PoolingFeatureException("missing topic source " + topic); + } + + /** + * Finds the topic sink associated with the internal topic. + * + * @return the topic sink + * @throws PoolingFeatureException if the sink doesn't exist + */ + private TopicSink findTopicSink() throws PoolingFeatureException { + for (TopicSink sink : getTopicSinks()) { + if (topic.equals(sink.getTopic())) { + return sink; + } + } + + throw new PoolingFeatureException("missing topic sink " + topic); + } + + /** + * Starts the publisher, if it isn't already running. + */ + public void startPublisher() { + if (publishing) { + return; + } + + logger.info("start publishing to topic {}", topic); + publishing = true; + } + + /** + * Stops the publisher. + * + * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and + * close + */ + public void stopPublisher(long waitMs) { + if (!publishing) { + return; + } + + /* + * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs" + * could be passed to sink.stop(), but that isn't an option at this time. + */ + try { + Thread.sleep(waitMs); + + } catch (InterruptedException e) { + logger.warn("message transmission stopped due to {}", e.getMessage()); + Thread.currentThread().interrupt(); + } + + logger.info("stop publishing to topic {}", topic); + publishing = false; + } + + /** + * Starts the consumer, if it isn't already running. + * + * @param listener listener to register with the source + */ + public void startConsumer(TopicListener listener) { + if (consuming) { + return; + } + + logger.info("start consuming from topic {}", topic); + topicSource.register(listener); + consuming = true; + } + + /** + * Stops the consumer. + * + * @param listener listener to unregister with the source + */ + public void stopConsumer(TopicListener listener) { + if (!consuming) { + return; + } + + logger.info("stop consuming from topic {}", topic); + consuming = false; + topicSource.unregister(listener); + } + + /** + * Publishes a message to the sink. + * + * @param msg message to be published + * @throws PoolingFeatureException if an error occurs or the publisher isn't running + */ + public void publish(String msg) throws PoolingFeatureException { + if (!publishing) { + throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic)); + } + + try { + if (!topicSink.send(msg)) { + throw new PoolingFeatureException("failed to send to topic sink " + topic); + } + + } catch (IllegalStateException e) { + throw new PoolingFeatureException("cannot send to topic sink " + topic, e); + } + } + + /* + * The remaining methods may be overridden by junit tests. + */ + + /** + * Get topic source. + * + * @return the topic sources + */ + protected List<TopicSource> getTopicSources() { + return TopicEndpointManager.getManager().getTopicSources(); + } + + /** + * Get topic sinks. + * + * @return the topic sinks + */ + protected List<TopicSink> getTopicSinks() { + return TopicEndpointManager.getManager().getTopicSinks(); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java new file mode 100644 index 00000000..485a9d2e --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java @@ -0,0 +1,205 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.message; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.onap.policy.drools.pooling.PoolingFeatureException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Bucket assignments, which is simply an array of host names. + */ +@Getter +@Setter +@NoArgsConstructor +public class BucketAssignments { + + private static final Logger logger = LoggerFactory.getLogger(BucketAssignments.class); + + /** + * The number of bits in the maximum number of buckets. + */ + private static final int MAX_BUCKET_BITS = 10; + + /** + * Maximum number of buckets. Must be a power of two. + */ + public static final int MAX_BUCKETS = 1 << MAX_BUCKET_BITS; + + /** + * Used to ensure that a hash code is not negative. + */ + private static final int MAX_BUCKETS_MASK = MAX_BUCKETS - 1; + + /** + * Identifies the host serving a particular bucket. + */ + private String[] hostArray = null; + + + /** + * Constructor. + * + * @param hostArray maps a bucket number (i.e., array index) to a host. All values + * must be non-null + */ + public BucketAssignments(String[] hostArray) { + this.hostArray = hostArray; + } + + /** + * Gets the leader, which is the host with the minimum UUID. + * + * @return the assignment leader + */ + public String getLeader() { + if (hostArray == null) { + return null; + } + + String leader = null; + + for (String host : hostArray) { + if (host != null && (leader == null || host.compareTo(leader) < 0)) { + leader = host; + } + } + + return leader; + + } + + /** + * Determines if a host has an assignment. + * + * @param host host to be checked + * @return {@code true} if the host has an assignment, {@code false} otherwise + */ + public boolean hasAssignment(String host) { + if (hostArray == null) { + return false; + } + + for (String host2 : hostArray) { + if (host.equals(host2)) { + return true; + } + } + + return false; + } + + /** + * Gets all the hosts that have an assignment. + * + * @return all the hosts that have an assignment + */ + public Set<String> getAllHosts() { + Set<String> set = new HashSet<>(); + if (hostArray == null) { + return set; + } + + for (String host : hostArray) { + if (host != null) { + set.add(host); + } + } + + return set; + } + + /** + * Gets the host assigned to a given bucket. + * + * @param hashCode hash code of the item whose assignment is desired + * @return the assigned host, or {@code null} if the item has no assigned host + */ + public String getAssignedHost(int hashCode) { + if (hostArray == null || hostArray.length == 0) { + logger.error("no buckets have been assigned"); + return null; + } + + return hostArray[(Math.abs(hashCode) & MAX_BUCKETS_MASK) % hostArray.length]; + } + + /** + * Gets the number of buckets. + * + * @return the number of buckets + */ + public int size() { + return (hostArray != null ? hostArray.length : 0); + } + + /** + * Checks the validity of the assignments, verifying that all buckets have been + * assigned to a host. + * + * @throws PoolingFeatureException if the assignments are invalid + */ + public void checkValidity() throws PoolingFeatureException { + if (hostArray == null || hostArray.length == 0) { + throw new PoolingFeatureException("missing hosts in message bucket assignments"); + } + + if (hostArray.length > MAX_BUCKETS) { + throw new PoolingFeatureException("too many hosts in message bucket assignments"); + } + + for (var x = 0; x < hostArray.length; ++x) { + if (hostArray[x] == null) { + throw new PoolingFeatureException("bucket " + x + " has no assignment"); + } + } + } + + @Override + public int hashCode() { + final var prime = 31; + var result = 1; + result = prime * result + Arrays.hashCode(hostArray); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BucketAssignments other = (BucketAssignments) obj; + return Arrays.equals(hostArray, other.hostArray); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java new file mode 100644 index 00000000..4a8bdc3b --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.message; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * Heart beat message sent to self, or to the succeeding host. + */ +@Getter +@Setter +@NoArgsConstructor +public class Heartbeat extends Message { + + /** + * Time, in milliseconds, when this was created. + */ + private long timestampMs; + + /** + * Constructor. + * + * @param source host on which the message originated + * @param timestampMs time, in milliseconds, associated with the message + */ + public Heartbeat(String source, long timestampMs) { + super(source); + + this.timestampMs = timestampMs; + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Identification.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Identification.java new file mode 100644 index 00000000..b8fd8414 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Identification.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.message; + +import lombok.NoArgsConstructor; + +/** + * Identifies the source host and the bucket assignments which it knows about. + */ +@NoArgsConstructor +public class Identification extends MessageWithAssignments { + + /** + * Constructor. + * + * @param source host on which the message originated + * @param assignments assignments + */ + public Identification(String source, BucketAssignments assignments) { + super(source, assignments); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Leader.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Leader.java new file mode 100644 index 00000000..10c33382 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Leader.java @@ -0,0 +1,70 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.message; + +import lombok.NoArgsConstructor; +import org.onap.policy.drools.pooling.PoolingFeatureException; + +/** + * Indicates that the "source" of this message is now the "lead" host. + */ +@NoArgsConstructor +public class Leader extends MessageWithAssignments { + + /** + * Constructor. + * + * @param source host on which the message originated + * @param assignments assignments + */ + public Leader(String source, BucketAssignments assignments) { + super(source, assignments); + } + + /** + * Also verifies that buckets have been assigned and that the source is + * indeed the leader. + */ + @Override + public void checkValidity() throws PoolingFeatureException { + + super.checkValidity(); + + BucketAssignments assignments = getAssignments(); + if (assignments == null) { + throw new PoolingFeatureException("missing message bucket assignments"); + } + + String leader = getSource(); + + if (!assignments.hasAssignment(leader)) { + throw new PoolingFeatureException("leader " + leader + " has no bucket assignments"); + } + + for (String host : assignments.getHostArray()) { + if (host.compareTo(leader) < 0) { + throw new PoolingFeatureException("invalid leader " + leader + ", should be " + host); + } + } + } + +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Message.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Message.java new file mode 100644 index 00000000..bb973142 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Message.java @@ -0,0 +1,79 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.message; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.onap.policy.drools.pooling.PoolingFeatureException; + +/** + * Messages sent on the internal topic. + */ +@Getter +@Setter +@NoArgsConstructor +public class Message { + + /** + * Name of the administrative channel. + */ + public static final String ADMIN = "_admin"; + + /** + * Host that originated the message. + */ + private String source; + + /** + * Channel on which the message is routed, which is either the target host + * or {@link #ADMIN}. + */ + private String channel; + + + /** + * Constructor. + * + * @param source host on which the message originated + */ + public Message(String source) { + this.source = source; + } + + /** + * Checks the validity of the message, including verifying that required + * fields are not missing. + * + * @throws PoolingFeatureException if the message is invalid + */ + public void checkValidity() throws PoolingFeatureException { + if (source == null || source.isEmpty()) { + throw new PoolingFeatureException("missing message source"); + } + + if (channel == null || channel.isEmpty()) { + throw new PoolingFeatureException("missing message channel"); + } + } + +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java new file mode 100644 index 00000000..b8521dd8 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java @@ -0,0 +1,68 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.message; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.onap.policy.drools.pooling.PoolingFeatureException; + +/** + * A Message that includes bucket assignments. + */ +@Setter +@Getter +@NoArgsConstructor +public class MessageWithAssignments extends Message { + + /** + * Bucket assignments, as known by the source host. + */ + private BucketAssignments assignments; + + + /** + * Constructor. + * + * @param source host on which the message originated + * @param assignments assignments + */ + public MessageWithAssignments(String source, BucketAssignments assignments) { + super(source); + + this.assignments = assignments; + } + + /** + * If there are any assignments, it verifies there validity. + */ + @Override + public void checkValidity() throws PoolingFeatureException { + + super.checkValidity(); + + if (assignments != null) { + assignments.checkValidity(); + } + } + +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Offline.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Offline.java new file mode 100644 index 00000000..3ff7bf1e --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Offline.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.message; + +import lombok.NoArgsConstructor; + +/** + * Indicates that the source host is going offline and will be unable to process + * any further requests. + */ +@NoArgsConstructor +public class Offline extends Message { + + /** + * Constructor. + * + * @param source host on which the message originated + */ + public Offline(String source) { + super(source); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Query.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Query.java new file mode 100644 index 00000000..4c856bb7 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/message/Query.java @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.message; + +import lombok.NoArgsConstructor; + +/** + * Query the other hosts for their identification. + */ +@NoArgsConstructor +public class Query extends Message { + + /** + * Constructor. + * + * @param source host on which the message originated + */ + public Query(String source) { + super(source); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java new file mode 100644 index 00000000..cafcb45b --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java @@ -0,0 +1,270 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import java.util.Arrays; +import java.util.TreeSet; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Offline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The active state. In this state, this host has one more bucket assignments and + * processes any events associated with one of its buckets. Other events are forwarded to + * appropriate target hosts. + */ +@Getter(AccessLevel.PROTECTED) +public class ActiveState extends ProcessingState { + + private static final Logger logger = LoggerFactory.getLogger(ActiveState.class); + + /** + * Set of hosts that have been assigned a bucket. + */ + @Getter(AccessLevel.NONE) + private final TreeSet<String> assigned = new TreeSet<>(); + + /** + * Host that comes after this host, or {@code null} if it has no successor. + */ + private String succHost = null; + + /** + * Host that comes before this host, or "" if it has no predecessor. + */ + private String predHost = ""; + + /** + * {@code True} if we saw this host's heart beat since the last check, {@code false} + * otherwise. + */ + private boolean myHeartbeatSeen = false; + + /** + * {@code True} if we saw the predecessor's heart beat since the last check, + * {@code false} otherwise. + */ + private boolean predHeartbeatSeen = false; + + + /** + * Constructor. + * + * @param mgr pooling manager + */ + public ActiveState(PoolingManager mgr) { + super(mgr, mgr.getAssignments().getLeader()); + + assigned.addAll(Arrays.asList(mgr.getAssignments().getHostArray())); + + detmNeighbors(); + } + + /** + * Determine this host's neighbors based on the order of the host UUIDs. Updates + * {@link #succHost} and {@link #predHost}. + */ + private void detmNeighbors() { + if (assigned.size() < 2) { + logger.info("this host has no neighbors on topic {}", getTopic()); + /* + * this host is the only one with any assignments - it has no neighbors + */ + succHost = null; + predHost = ""; + return; + } + + if ((succHost = assigned.higher(getHost())) == null) { + // wrapped around - successor is the first host in the set + succHost = assigned.first(); + } + logger.info("this host's successor is {} on topic {}", succHost, getTopic()); + + if ((predHost = assigned.lower(getHost())) == null) { + // wrapped around - predecessor is the last host in the set + predHost = assigned.last(); + } + logger.info("this host's predecessor is {} on topic {}", predHost, getTopic()); + } + + @Override + public void start() { + super.start(); + addTimers(); + genHeartbeat(); + } + + /** + * Adds the timers. + */ + private void addTimers() { + logger.info("add timers"); + + /* + * heart beat generator + */ + long genMs = getProperties().getInterHeartbeatMs(); + + scheduleWithFixedDelay(genMs, genMs, () -> { + genHeartbeat(); + return null; + }); + + /* + * my heart beat checker + */ + long waitMs = getProperties().getActiveHeartbeatMs(); + + scheduleWithFixedDelay(waitMs, waitMs, () -> { + if (myHeartbeatSeen) { + myHeartbeatSeen = false; + return null; + } + + // missed my heart beat + logger.error("missed my heartbeat on topic {}", getTopic()); + + return missedHeartbeat(); + }); + + /* + * predecessor heart beat checker + */ + if (!predHost.isEmpty()) { + + scheduleWithFixedDelay(waitMs, waitMs, () -> { + if (predHeartbeatSeen) { + predHeartbeatSeen = false; + return null; + } + + // missed the predecessor's heart beat + logger.warn("missed predecessor's heartbeat on topic {}", getTopic()); + + publish(makeQuery()); + + return goQuery(); + }); + } + } + + /** + * Generates a heart beat for this host and its successor. + */ + private void genHeartbeat() { + var msg = makeHeartbeat(System.currentTimeMillis()); + publish(getHost(), msg); + + if (succHost != null) { + publish(succHost, msg); + } + } + + @Override + public State process(Heartbeat msg) { + String src = msg.getSource(); + + if (src == null) { + logger.warn("Heartbeat message has no source on topic {}", getTopic()); + + } else if (src.equals(getHost())) { + logger.info("saw my heartbeat on topic {}", getTopic()); + myHeartbeatSeen = true; + + } else if (src.equals(predHost)) { + logger.info("saw heartbeat from {} on topic {}", src, getTopic()); + predHeartbeatSeen = true; + + } else { + logger.info("ignored heartbeat message from {} on topic {}", src, getTopic()); + } + + return null; + } + + @Override + public State process(Leader msg) { + if (!isValid(msg)) { + return null; + } + + String src = msg.getSource(); + + if (getHost().compareTo(src) < 0) { + // our host would be a better leader - find out what's up + logger.warn("unexpected Leader message from {} on topic {}", src, getTopic()); + return goQuery(); + } + + logger.info("have a new leader {} on topic {}", src, getTopic()); + + return goActive(msg.getAssignments()); + } + + @Override + public State process(Offline msg) { + String src = msg.getSource(); + + if (src == null) { + logger.warn("Offline message has no source on topic {}", getTopic()); + return null; + + } else if (!assigned.contains(src)) { + /* + * the offline host wasn't assigned any buckets, so just ignore the message + */ + logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic()); + return null; + + } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) { + /* + * Case 1: We are the leader. + * + * Case 2: Our predecessor was the leader, and it has gone offline - we should + * become the leader. + * + * In either case, we are now the leader, and we must re-balance the buckets + * since one of the hosts has gone offline. + */ + + logger.info("Offline message from source {} on topic {}", src, getTopic()); + + assigned.remove(src); + + return becomeLeader(assigned); + + } else { + /* + * Otherwise, we don't care right now - we'll wait for the leader to tell us + * it's been removed. + */ + logger.info("ignore Offline message from source {} on topic {}", src, getTopic()); + return null; + } + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java new file mode 100644 index 00000000..06418280 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import org.onap.policy.drools.pooling.PoolingManager; + +/** + * Idle state, used when offline. + */ +public class IdleState extends State { + + public IdleState(PoolingManager mgr) { + super(mgr); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java new file mode 100644 index 00000000..7fc220a0 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The inactive state. In this state, we just wait a bit and then try to re-activate. In + * the meantime, all messages are ignored. + */ +public class InactiveState extends State { + + private static final Logger logger = LoggerFactory.getLogger(InactiveState.class); + + /** + * Constructor. + * + * @param mgr pooling manager + */ + public InactiveState(PoolingManager mgr) { + super(mgr); + } + + @Override + public void start() { + super.start(); + schedule(getProperties().getReactivateMs(), this::goStart); + } + + @Override + public State process(Leader msg) { + if (isValid(msg)) { + logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic()); + return goActive(msg.getAssignments()); + } + + return null; + } + + /** + * Generates an Identification message and goes to the query state. + */ + @Override + public State process(Query msg) { + logger.info("received Query message on topic {}", getTopic()); + publish(makeIdentification()); + return goQuery(); + } + + /** + * Remains in this state, without resetting any timers. + */ + @Override + protected State goInactive() { + return null; + } + +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java new file mode 100644 index 00000000..76914b75 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java @@ -0,0 +1,398 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; +import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Any state in which events are being processed locally and forwarded, as appropriate. + */ +@Setter +@Getter +public class ProcessingState extends State { + + private static final Logger logger = LoggerFactory.getLogger(ProcessingState.class); + + /** + * Current known leader, never {@code null}. + */ + @NonNull + private String leader; + + /** + * Constructor. + * + * @param mgr pooling manager + * @param leader current known leader, which need not be the same as the assignment + * leader. Never {@code null} + * @throws IllegalArgumentException if an argument is invalid + */ + public ProcessingState(PoolingManager mgr, @NonNull String leader) { + super(mgr); + + BucketAssignments assignments = mgr.getAssignments(); + + if (assignments != null) { + String[] arr = assignments.getHostArray(); + if (arr != null && arr.length == 0) { + throw new IllegalArgumentException("zero-length bucket assignments"); + } + } + + this.leader = leader; + } + + /** + * Generates an Identification message and goes to the query state. + */ + @Override + public State process(Query msg) { + logger.info("received Query message on topic {}", getTopic()); + publish(makeIdentification()); + return goQuery(); + } + + /** + * Sets the assignments. + * + * @param assignments new assignments, or {@code null} + */ + protected final void setAssignments(BucketAssignments assignments) { + if (assignments != null) { + startDistributing(assignments); + } + } + + /** + * Determines if this host is the leader, based on the current assignments. + * + * @return {@code true} if this host is the leader, {@code false} otherwise + */ + public boolean isLeader() { + return getHost().equals(leader); + } + + /** + * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}. + * + * @param alive hosts that are known to be alive + * + * @return the new state + */ + protected State becomeLeader(SortedSet<String> alive) { + String newLeader = getHost(); + + if (!newLeader.equals(alive.first())) { + throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first()); + } + + var msg = makeLeader(alive); + logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size()); + + publish(msg); + + return goActive(msg.getAssignments()); + } + + /** + * Makes a leader message. Assumes "this" host is the leader, and thus appears as the + * first host in the set of hosts that are still alive. + * + * @param alive hosts that are known to be alive + * + * @return a new message + */ + private Leader makeLeader(Set<String> alive) { + return new Leader(getHost(), makeAssignments(alive)); + } + + /** + * Makes a set of bucket assignments. Assumes "this" host is the leader. + * + * @param alive hosts that are known to be alive + * + * @return a new set of bucket assignments + */ + private BucketAssignments makeAssignments(Set<String> alive) { + + // make a working array from the CURRENT assignments + String[] bucket2host = makeBucketArray(); + + TreeSet<String> avail = new TreeSet<>(alive); + + // if we have more hosts than buckets, then remove the extra hosts + removeExcessHosts(bucket2host.length, avail); + + // create a host bucket for each available host + Map<String, HostBucket> host2hb = new HashMap<>(); + avail.forEach(host -> host2hb.put(host, new HostBucket(host))); + + // add bucket indices to the appropriate host bucket + addIndicesToHostBuckets(bucket2host, host2hb); + + // convert the collection back to an array + fillArray(host2hb.values(), bucket2host); + + // update bucket2host with new assignments + rebalanceBuckets(host2hb.values(), bucket2host); + + return new BucketAssignments(bucket2host); + } + + /** + * Makes a bucket array, copying the current assignments, if available. + * + * @return a new bucket array + */ + private String[] makeBucketArray() { + BucketAssignments asgn = getAssignments(); + if (asgn == null) { + return new String[BucketAssignments.MAX_BUCKETS]; + } + + String[] oldArray = asgn.getHostArray(); + if (oldArray.length == 0) { + return new String[BucketAssignments.MAX_BUCKETS]; + } + + var newArray = new String[oldArray.length]; + System.arraycopy(oldArray, 0, newArray, 0, oldArray.length); + + return newArray; + } + + /** + * Removes excess hosts from the set of available hosts. Assumes "this" host is the + * leader, and thus appears as the first host in the set. + * + * @param maxHosts maximum number of hosts to be retained + * @param avail available hosts + */ + private void removeExcessHosts(int maxHosts, SortedSet<String> avail) { + while (avail.size() > maxHosts) { + /* + * Don't remove this host, as it's the leader. Since the leader is always at + * the front of the sorted set, we'll just pick off hosts from the back of the + * set. + */ + String host = avail.last(); + avail.remove(host); + + logger.warn("not using extra host {} for topic {}", host, getTopic()); + } + } + + /** + * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or + * assigned to a host that does not appear within the map are re-assigned to a host + * that appears within the map. + * + * @param bucket2host bucket assignments + * @param host2data maps a host name to its {@link HostBucket} + */ + private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) { + LinkedList<Integer> nullBuckets = new LinkedList<>(); + + for (var x = 0; x < bucket2host.length; ++x) { + String host = bucket2host[x]; + if (host == null) { + nullBuckets.add(x); + + } else { + HostBucket hb = host2data.get(host); + if (hb == null) { + nullBuckets.add(x); + + } else { + hb.add(x); + } + } + } + + // assign the null buckets to other hosts + assignNullBuckets(nullBuckets, host2data.values()); + } + + /** + * Assigns null buckets (i.e., those having no assignment) to available hosts. + * + * @param buckets buckets that still need to be assigned to hosts + * @param coll collection of current host-bucket assignments + */ + private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) { + // assign null buckets to the hosts with the fewest buckets + TreeSet<HostBucket> assignments = new TreeSet<>(coll); + + for (Integer index : buckets) { + // add it to the host with the shortest bucket list + HostBucket newhb = assignments.pollFirst(); + assert newhb != null; + newhb.add(index); + + // put the item back into the queue, with its new count + assignments.add(newhb); + } + } + + /** + * Re-balances the buckets, taking from those that have a larger count and giving to + * those that have a smaller count. Populates an output array with the new + * assignments. + * + * @param coll current bucket assignment + * @param bucket2host array to be populated with the new assignments + */ + private void rebalanceBuckets(Collection<HostBucket> coll, String[] bucket2host) { + if (coll.size() <= 1) { + // only one hosts - nothing to rebalance + return; + } + + TreeSet<HostBucket> assignments = new TreeSet<>(coll); + + for (;;) { + HostBucket smaller = assignments.pollFirst(); + HostBucket larger = assignments.pollLast(); + + assert larger != null && smaller != null; + if (larger.size() - smaller.size() <= 1) { + // it's as balanced as it will get + break; + } + + // move the bucket from the larger to the smaller + Integer bucket = larger.remove(); + smaller.add(bucket); + + bucket2host[bucket] = smaller.host; + + // put the items back, with their new counts + assignments.add(larger); + assignments.add(smaller); + } + + } + + /** + * Fills the array with the host assignments. + * + * @param coll the host assignments + * @param bucket2host array to be filled + */ + private void fillArray(Collection<HostBucket> coll, String[] bucket2host) { + for (HostBucket hb : coll) { + for (Integer index : hb.buckets) { + bucket2host[index] = hb.host; + } + } + } + + /** + * Tracks buckets that have been assigned to a host. + */ + protected static class HostBucket implements Comparable<HostBucket> { + /** + * Host to which the buckets have been assigned. + */ + private String host; + + /** + * Buckets that have been assigned to this host. + */ + private Queue<Integer> buckets = new LinkedList<>(); + + /** + * Constructor. + * + * @param host host + */ + public HostBucket(String host) { + this.host = host; + } + + /** + * Removes the next bucket from the list. + * + * @return the next bucket + */ + public final Integer remove() { + return buckets.remove(); + } + + /** + * Adds a bucket to the list. + * + * @param index index of the bucket to add + */ + public final void add(Integer index) { + buckets.add(index); + } + + /** + * Size. + * + * @return the number of buckets assigned to this host + */ + public final int size() { + return buckets.size(); + } + + /** + * Compares host buckets, first by the number of buckets, and then by the host + * name. + */ + @Override + public final int compareTo(HostBucket other) { + int diff = buckets.size() - other.buckets.size(); + if (diff == 0) { + diff = host.compareTo(other.host); + } + return diff; + } + + @Override + public final int hashCode() { + throw new UnsupportedOperationException("HostBucket cannot be hashed"); + } + + @Override + public final boolean equals(Object obj) { + throw new UnsupportedOperationException("cannot compare HostBuckets"); + } + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java new file mode 100644 index 00000000..ef401dcb --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java @@ -0,0 +1,204 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import java.util.TreeSet; +import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Offline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Query state. In this state, the host waits for the other hosts to identify + * themselves. Eventually, a leader should come forth. If not, it will transition to the + * active or inactive state, depending on whether it has an assignment in the + * current bucket assignments. The other possibility is that it may <i>become</i> the + * leader, in which case it will also transition to the active state. + */ +public class QueryState extends ProcessingState { + + private static final Logger logger = LoggerFactory.getLogger(QueryState.class); + + /** + * Hosts that have sent an "Identification" message. Always includes this host. + */ + private final TreeSet<String> alive = new TreeSet<>(); + + /** + * {@code True} if we saw our own Identification method, {@code false} otherwise. + */ + private boolean sawSelfIdent = false; + + /** + * Constructor. + * + * @param mgr manager + */ + public QueryState(PoolingManager mgr) { + // this host is the leader, until a better candidate identifies itself + super(mgr, mgr.getHost()); + + alive.add(getHost()); + } + + @Override + public void start() { + super.start(); + + // start identification timer + awaitIdentification(); + } + + /** + * Starts a timer to wait for all Identification messages to arrive. + */ + private void awaitIdentification() { + + /* + * Once we've waited long enough for all Identification messages to arrive, become + * the leader, assuming we should. + */ + + schedule(getProperties().getIdentificationMs(), () -> { + + if (!sawSelfIdent) { + // didn't see our identification + logger.error("missed our own Ident message on topic {}", getTopic()); + return missedHeartbeat(); + + } else if (isLeader()) { + // "this" host is the new leader + logger.info("this host is the new leader for topic {}", getTopic()); + return becomeLeader(alive); + + } else { + // not the leader - return to previous state + logger.info("no new leader on topic {}", getTopic()); + return goActive(getAssignments()); + } + }); + } + + @Override + public State goQuery() { + return null; + } + + @Override + public State process(Identification msg) { + + if (getHost().equals(msg.getSource())) { + logger.info("saw our own Ident message on topic {}", getTopic()); + sawSelfIdent = true; + + } else { + logger.info("received Ident message from {} on topic {}", msg.getSource(), getTopic()); + recordInfo(msg.getSource(), msg.getAssignments()); + } + + return null; + } + + /** + * If the message leader is better than the leader we have, then go active with it. + * Otherwise, simply treat it like an {@link Identification} message. + */ + @Override + public State process(Leader msg) { + if (!isValid(msg)) { + return null; + } + + String source = msg.getSource(); + BucketAssignments asgn = msg.getAssignments(); + + // go active, if this has a leader that's the same or better than the one we have + if (source.compareTo(getLeader()) <= 0) { + logger.warn("leader with {} on topic {}", source, getTopic()); + return goActive(asgn); + } + + /* + * The message does not have an acceptable leader, but we'll still record its + * info. + */ + logger.info("record leader info from {} on topic {}", source, getTopic()); + recordInfo(source, asgn); + + return null; + } + + @Override + public State process(Offline msg) { + String host = msg.getSource(); + + if (host != null && !host.equals(getHost())) { + logger.warn("host {} offline on topic {}", host, getTopic()); + alive.remove(host); + setLeader(alive.first()); + + } else { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); + } + + return null; + } + + /** + * Records info from a message, adding the source host name to {@link #alive}, and + * updating the bucket assignments. + * + * @param source the message's source host + * @param assignments assignments, or {@code null} + */ + private void recordInfo(String source, BucketAssignments assignments) { + // add this message's source host to "alive" + if (source != null) { + alive.add(source); + setLeader(alive.first()); + } + + if (assignments == null || assignments.getLeader() == null) { + return; + } + + // record assignments, if we don't have any yet + BucketAssignments current = getAssignments(); + if (current == null) { + logger.info("received initial assignments on topic {}", getTopic()); + setAssignments(assignments); + return; + } + + /* + * Record assignments, if the new assignments have a better (i.e., lesser) leader. + */ + String curldr = current.getLeader(); + if (curldr == null || assignments.getLeader().compareTo(curldr) < 0) { + logger.info("use new assignments from {} on topic {}", source, getTopic()); + setAssignments(assignments); + } + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StartState.java new file mode 100644 index 00000000..73717d7c --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StartState.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import lombok.Getter; +import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The start state. Upon entry, a heart beat is generated and the event filter is changed + * to look for just that particular message. Once the message is seen, it goes into the + * {@link QueryState}. + */ +@Getter +public class StartState extends State { + + private static final Logger logger = LoggerFactory.getLogger(StartState.class); + + /** + * Time stamp inserted into the heart beat message. + */ + private long hbTimestampMs = System.currentTimeMillis(); + + /** + * Constructor. + * + * @param mgr pooling manager + */ + public StartState(PoolingManager mgr) { + super(mgr); + } + + @Override + public void start() { + + super.start(); + + var hb = makeHeartbeat(hbTimestampMs); + publish(getHost(), hb); + + /* + * heart beat generator + */ + long genMs = getProperties().getInterHeartbeatMs(); + + scheduleWithFixedDelay(genMs, genMs, () -> { + publish(getHost(), hb); + return null; + }); + + /* + * my heart beat checker + */ + schedule(getProperties().getStartHeartbeatMs(), () -> { + logger.error("missed heartbeat on topic {}", getTopic()); + return internalTopicFailed(); + }); + } + + /** + * Transitions to the query state if the heart beat originated from this host and its + * time stamp matches. + */ + @Override + public State process(Heartbeat msg) { + if (msg.getTimestampMs() == hbTimestampMs && getHost().equals(msg.getSource())) { + // saw our own heart beat - transition to query state + logger.info("saw our own heartbeat on topic {}", getTopic()); + publish(makeQuery()); + return goQuery(); + + } else { + logger.info("ignored old heartbeat message from {} on topic {}", msg.getSource(), getTopic()); + } + + return null; + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java new file mode 100644 index 00000000..e2cf9586 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -0,0 +1,373 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +import java.util.LinkedList; +import java.util.List; +import org.onap.policy.drools.pooling.CancellableScheduledTask; +import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.PoolingProperties; +import org.onap.policy.drools.pooling.message.BucketAssignments; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A state in the finite state machine. + * + * <p>A state may have several timers associated with it, which must be cancelled whenever + * the state is changed. Assumes that timers are not continuously added to the same state. + */ +public abstract class State { + + private static final Logger logger = LoggerFactory.getLogger(State.class); + + /** + * Host pool manager. + */ + private final PoolingManager mgr; + + /** + * Timers added by this state. + */ + private final List<CancellableScheduledTask> timers = new LinkedList<>(); + + /** + * Constructor. + * + * @param mgr pooling manager + */ + protected State(PoolingManager mgr) { + this.mgr = mgr; + } + + /** + * Cancels the timers added by this state. + */ + public final void cancelTimers() { + timers.forEach(CancellableScheduledTask::cancel); + } + + /** + * Starts the state. The default method simply logs a message and returns. + */ + public void start() { + logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic()); + } + + /** + * Transitions to the "start" state. + * + * @return the new state + */ + public final State goStart() { + return mgr.goStart(); + } + + /** + * Transitions to the "query" state. + * + * @return the new state + */ + public State goQuery() { + return mgr.goQuery(); + } + + /** + * Goes active with a new set of assignments. + * + * @param asgn new assignments + * @return the new state, either Active or Inactive, depending on whether or not this + * host has an assignment + */ + protected State goActive(BucketAssignments asgn) { + startDistributing(asgn); + + if (asgn != null && asgn.hasAssignment(getHost())) { + return mgr.goActive(); + + } else { + return goInactive(); + } + } + + /** + * Transitions to the "inactive" state. + * + * @return the new state + */ + protected State goInactive() { + return mgr.goInactive(); + } + + /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Heartbeat msg) { + logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Identification msg) { + logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** + * Processes a message. The default method copies the assignments and then returns + * {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Leader msg) { + if (isValid(msg)) { + logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic()); + startDistributing(msg.getAssignments()); + } + + return null; + } + + /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Offline msg) { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Query msg) { + logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** + * Determines if a message is valid and did not originate from this host. + * + * @param msg message to be validated + * @return {@code true} if the message is valid, {@code false} otherwise + */ + protected boolean isValid(Leader msg) { + BucketAssignments asgn = msg.getAssignments(); + if (asgn == null) { + logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic()); + return false; + } + + // ignore Leader messages from ourself + String source = msg.getSource(); + if (source == null || source.equals(getHost())) { + logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic()); + return false; + } + + // the new leader must equal the source + boolean result = source.equals(asgn.getLeader()); + + if (!result) { + logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic()); + } + + return result; + } + + /** + * Publishes a message. + * + * @param msg message to be published + */ + protected final void publish(Identification msg) { + mgr.publishAdmin(msg); + } + + /** + * Publishes a message. + * + * @param msg message to be published + */ + protected final void publish(Leader msg) { + mgr.publishAdmin(msg); + } + + /** + * Publishes a message. + * + * @param msg message to be published + */ + protected final void publish(Offline msg) { + mgr.publishAdmin(msg); + } + + /** + * Publishes a message. + * + * @param msg message to be published + */ + protected final void publish(Query msg) { + mgr.publishAdmin(msg); + } + + /** + * Publishes a message on the specified channel. + * + * @param channel channel + * @param msg message to be published + */ + protected final void publish(String channel, Heartbeat msg) { + mgr.publish(channel, msg); + } + + /** + * Starts distributing messages using the specified bucket assignments. + * + * @param assignments assignments + */ + protected final void startDistributing(BucketAssignments assignments) { + if (assignments != null) { + mgr.startDistributing(assignments); + } + } + + /** + * Schedules a timer to fire after a delay. + * + * @param delayMs delay in ms + * @param task task + */ + protected final void schedule(long delayMs, StateTimerTask task) { + timers.add(mgr.schedule(delayMs, task)); + } + + /** + * Schedules a timer to fire repeatedly. + * + * @param initialDelayMs initial delay ms + * @param delayMs delay ms + * @param task task + */ + protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task)); + } + + /** + * Indicates that we failed to see our own heartbeat; must be a problem with the + * internal topic. Assumes the problem is temporary and continues to use the current + * bucket assignments. + * + * @return a new {@link StartState} + */ + protected final State missedHeartbeat() { + publish(makeOffline()); + + return mgr.goStart(); + } + + /** + * Indicates that the internal topic failed; this should only be invoked from the + * StartState. Discards bucket assignments and begins processing everything locally. + * + * @return a new {@link InactiveState} + */ + protected final State internalTopicFailed() { + publish(makeOffline()); + mgr.startDistributing(null); + + return mgr.goInactive(); + } + + /** + * Makes a heart beat message. + * + * @param timestampMs time, in milliseconds, associated with the message + * + * @return a new message + */ + protected final Heartbeat makeHeartbeat(long timestampMs) { + return new Heartbeat(getHost(), timestampMs); + } + + /** + * Makes an Identification message. + * + * @return a new message + */ + protected Identification makeIdentification() { + return new Identification(getHost(), getAssignments()); + } + + /** + * Makes an "offline" message. + * + * @return a new message + */ + protected final Offline makeOffline() { + return new Offline(getHost()); + } + + /** + * Makes a query message. + * + * @return a new message + */ + protected final Query makeQuery() { + return new Query(getHost()); + } + + public final BucketAssignments getAssignments() { + return mgr.getAssignments(); + } + + public final String getHost() { + return mgr.getHost(); + } + + public final String getTopic() { + return mgr.getTopic(); + } + + public final PoolingProperties getProperties() { + return mgr.getProperties(); + } +} diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java new file mode 100644 index 00000000..892a1767 --- /dev/null +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling.state; + +/** + * Task to be executed when a timer fires within a {@link State}. + */ +@FunctionalInterface +public interface StateTimerTask { + + /** + * Fires the timer. + * + * @return the new state, or {@code null} if the state is unchanged + */ + State fire(); + +} diff --git a/feature-pooling-messages/src/main/resources/META-INF/services/org.onap.policy.drools.features.DroolsControllerFeatureApi b/feature-pooling-messages/src/main/resources/META-INF/services/org.onap.policy.drools.features.DroolsControllerFeatureApi new file mode 100644 index 00000000..cd59e469 --- /dev/null +++ b/feature-pooling-messages/src/main/resources/META-INF/services/org.onap.policy.drools.features.DroolsControllerFeatureApi @@ -0,0 +1 @@ +org.onap.policy.drools.pooling.PoolingFeature diff --git a/feature-pooling-messages/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyControllerFeatureApi b/feature-pooling-messages/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyControllerFeatureApi new file mode 100644 index 00000000..cd59e469 --- /dev/null +++ b/feature-pooling-messages/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyControllerFeatureApi @@ -0,0 +1 @@ +org.onap.policy.drools.pooling.PoolingFeature diff --git a/feature-pooling-messages/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi b/feature-pooling-messages/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi new file mode 100644 index 00000000..cd59e469 --- /dev/null +++ b/feature-pooling-messages/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi @@ -0,0 +1 @@ +org.onap.policy.drools.pooling.PoolingFeature |