aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2018-03-26 16:48:31 -0400
committerJim Hahn <jrh3@att.com>2018-03-28 23:47:53 -0400
commita3fa1c69a955af57f4e9023488bac3ef67a4fc3e (patch)
tree0f5173ea23c5d40cdef0f64dffc3fc18e695cf64 /feature-pooling-dmaap/src/main/java
parent1d2c8346e0ac02320ca933b66c1943c7f72343c6 (diff)
Add pooling capability
Add an optional feature that that supports session pooling, wherein more than one host can be active at a time. Use beforeInsert() instead of beforeOffer(), where possible. Move request-id-extraction from policy-managment to feature-pooling. Combined AdditionalProperties into PoolingProperties. Finished junit tests for DmaapManager. Adjusted filters for all XxxState classes, and added testGetFilter to all XxxStateTest classes. Always publish Offline message when the internal topic fails. Remove DelayedExtractor, as it isn't needed. Renamed ExtractorMap to ClassExtractors, and added property name prefix to the constructor to give more control over property naming to invokers. Remove State copy constructor. Use class name instead of class in ClassExtractors map. Remove BucketAssignments from ProcessingState. Remove some TODO items. Add META-INF for implemented feature APIs. Fix ClassExtractor bug where it can't find a field in a superclass, and add a test for classes defined in another file. Add assembly and rename project directory. Add more junit coverage. Change-Id: I7f132f84a7b284a58ab09c9069db19b853acd7e9 Issue-ID: POLICY-577 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-pooling-dmaap/src/main/java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java294
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java121
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java144
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java390
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java59
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java50
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java152
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java871
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java162
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java79
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java109
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java466
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java35
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java49
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java59
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java60
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java58
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java215
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java180
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java60
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java45
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java75
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java103
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java77
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java45
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java44
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java255
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java96
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java85
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java55
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java410
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java209
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java132
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java370
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java37
35 files changed, 5651 insertions, 0 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
new file mode 100644
index 00000000..98543f29
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -0,0 +1,294 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.util.List;
+import java.util.Properties;
+import org.onap.policy.drools.event.comm.FilterableTopicSource;
+import org.onap.policy.drools.event.comm.TopicEndpoint;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the internal DMaaP topic.
+ */
+public class DmaapManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(DmaapManager.class);
+
+ /**
+ * Factory used to construct objects.
+ */
+ private static Factory factory = new Factory();
+
+ /**
+ * Name of the DMaaP topic.
+ */
+ private final String topic;
+
+ /**
+ * Topic source whose filter is to be manipulated.
+ */
+ private final FilterableTopicSource topicSource;
+
+ /**
+ * Where to publish messages.
+ */
+ private final TopicSink topicSink;
+
+ /**
+ * Topic sources. In theory, there's only one item in this list, the
+ * internal DMaaP topic.
+ */
+ private final List<TopicSource> sources;
+
+ /**
+ * Topic sinks. In theory, there's only one item in this list, the internal
+ * DMaaP topic.
+ */
+ private final List<TopicSink> sinks;
+
+ /**
+ * {@code True} if the consumer is running, {@code false} otherwise.
+ */
+ private boolean consuming = false;
+
+ /**
+ * {@code True} if the publisher is running, {@code false} otherwise.
+ */
+ private boolean publishing = false;
+
+ /**
+ * Constructs the manager, but does not start the source or sink.
+ *
+ * @param topic name of the internal DMaaP topic
+ * @param props properties to configure the topic source & sink
+ * @throws PoolingFeatureException if an error occurs
+ */
+ public DmaapManager(String topic, Properties props) throws PoolingFeatureException {
+
+ logger.info("initializing bus for topic {}", topic);
+
+ try {
+ this.topic = topic;
+ this.sources = factory.initTopicSources(props);
+ this.sinks = factory.initTopicSinks(props);
+
+ this.topicSource = findTopicSource();
+ this.topicSink = findTopicSink();
+
+ // verify that we can set the filter
+ setFilter(null);
+
+ } catch (IllegalArgumentException e) {
+ logger.error("failed to attach to topic {}", topic);
+ throw new PoolingFeatureException(e);
+ }
+ }
+
+ protected static Factory getFactory() {
+ return factory;
+ }
+
+ /**
+ * Used by junit tests to set the factory used to create various objects
+ * used by this class.
+ *
+ * @param factory the new factory
+ */
+ protected static void setFactory(Factory factory) {
+ DmaapManager.factory = factory;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * Finds the topic source associated with the internal DMaaP topic.
+ *
+ * @return the topic source
+ * @throws PoolingFeatureException if the source doesn't exist or is not
+ * filterable
+ */
+ private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
+ for (TopicSource src : sources) {
+ if (topic.equals(src.getTopic())) {
+ if (src instanceof FilterableTopicSource) {
+ return (FilterableTopicSource) src;
+
+ } else {
+ throw new PoolingFeatureException("topic source " + topic + " is not filterable");
+ }
+ }
+ }
+
+ throw new PoolingFeatureException("missing topic source " + topic);
+ }
+
+ /**
+ * Finds the topic sink associated with the internal DMaaP topic.
+ *
+ * @return the topic sink
+ * @throws PoolingFeatureException if the sink doesn't exist
+ */
+ private TopicSink findTopicSink() throws PoolingFeatureException {
+ for (TopicSink sink : sinks) {
+ if (topic.equals(sink.getTopic())) {
+ return sink;
+ }
+ }
+
+ throw new PoolingFeatureException("missing topic sink " + topic);
+ }
+
+ /**
+ * Starts the publisher, if it isn't already running.
+ *
+ * @throws PoolingFeatureException if an error occurs
+ */
+ public void startPublisher() throws PoolingFeatureException {
+ if (publishing) {
+ return;
+ }
+
+ try {
+ topicSink.start();
+ publishing = true;
+
+ } catch (IllegalStateException e) {
+ throw new PoolingFeatureException("cannot start topic sink " + topic, e);
+ }
+ }
+
+ /**
+ * Stops the publisher.
+ */
+ public void stopPublisher() {
+ if (!publishing) {
+ return;
+ }
+
+ try {
+ publishing = false;
+ topicSink.stop();
+
+ } catch (IllegalStateException e) {
+ logger.error("cannot stop sink for topic {}", topic, e);
+ }
+ }
+
+ /**
+ * Starts the consumer, if it isn't already running.
+ *
+ * @param listener listener to register with the source
+ */
+ public void startConsumer(TopicListener listener) {
+ if (consuming) {
+ return;
+ }
+
+ topicSource.register(listener);
+ consuming = true;
+ }
+
+ /**
+ * Stops the consumer.
+ *
+ * @param listener listener to unregister with the source
+ */
+ public void stopConsumer(TopicListener listener) {
+ if (!consuming) {
+ return;
+ }
+
+ consuming = false;
+ topicSource.unregister(listener);
+ }
+
+ /**
+ * Sets the server-side filter to be used by the consumer.
+ *
+ * @param filter the filter string, or {@code null} if no filter is to be
+ * used
+ * @throws PoolingFeatureException if the topic is not filterable
+ */
+ public void setFilter(String filter) throws PoolingFeatureException {
+ try {
+ topicSource.setFilter(filter);
+
+ } catch (UnsupportedOperationException e) {
+ throw new PoolingFeatureException("cannot filter topic " + topic);
+ }
+ }
+
+ /**
+ * Publishes a message to the sink.
+ *
+ * @param msg message to be published
+ * @throws PoolingFeatureException if an error occurs or the publisher isn't
+ * running
+ */
+ public void publish(String msg) throws PoolingFeatureException {
+ if (!publishing) {
+ throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
+ }
+
+ try {
+ if (!topicSink.send(msg)) {
+ throw new PoolingFeatureException("failed to send to topic sink " + topic);
+ }
+
+ } catch (IllegalStateException e) {
+ throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
+ }
+ }
+
+ /**
+ * Factory used to construct objects.
+ */
+ public static class Factory {
+
+ /**
+ * Initializes the topic sources.
+ *
+ * @param props properties used to configure the topics
+ * @return the topic sources
+ */
+ public List<TopicSource> initTopicSources(Properties props) {
+ return TopicEndpoint.manager.addTopicSources(props);
+ }
+
+ /**
+ * Initializes the topic sinks.
+ *
+ * @param props properties used to configure the topics
+ * @return the topic sinks
+ */
+ public List<TopicSink> initTopicSinks(Properties props) {
+ return TopicEndpoint.manager.addTopicSinks(props);
+ }
+
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java
new file mode 100644
index 00000000..0bed85b5
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java
@@ -0,0 +1,121 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Finite queue of events waiting to be processed once the buckets have been
+ * assigned.
+ */
+public class EventQueue {
+
+ private static final Logger logger = LoggerFactory.getLogger(EventQueue.class);
+
+ /**
+ * Maximum number of events allowed in the queue. When excess events are
+ * added, the older events are removed.
+ */
+ private int maxEvents;
+
+ /**
+ * Maximum age, in milliseconds, of events in the queue. Events that are
+ * older than this are discarded rather than being handed off when
+ * {@link #poll()} is invoked.
+ */
+ private long maxAgeMs;
+
+ /**
+ * The actual queue of events.
+ */
+ private Deque<Forward> events = new LinkedList<>();
+
+ /**
+ *
+ * @param maxEvents maximum number of events to hold in the queue
+ * @param maxAgeMs maximum age of events in the queue
+ */
+ public EventQueue(int maxEvents, long maxAgeMs) {
+ this.maxEvents = maxEvents;
+ this.maxAgeMs = maxAgeMs;
+ }
+
+ /**
+ *
+ * @return {@code true} if the queue is empty, {@code false} otherwise
+ */
+ public boolean isEmpty() {
+ return events.isEmpty();
+ }
+
+ /**
+ * Clears the queue.
+ */
+ public void clear() {
+ events.clear();
+ }
+
+ /**
+ *
+ * @return the number of elements in the queue
+ */
+ public int size() {
+ return events.size();
+ }
+
+ /**
+ * Adds an item to the queue. If the queue is full, the older item is
+ * removed and discarded.
+ *
+ * @param event
+ */
+ public void add(Forward event) {
+ if (events.size() >= maxEvents) {
+ logger.warn("full queue - discarded event for topic {}", event.getTopic());
+ events.remove();
+ }
+
+ events.add(event);
+ }
+
+ /**
+ * Gets the oldest, un-expired event from the queue.
+ *
+ * @return the oldest, un-expired event
+ */
+ public Forward poll() {
+ long tmin = System.currentTimeMillis() - maxAgeMs;
+
+ Forward ev;
+ while ((ev = events.poll()) != null) {
+ if (!ev.isExpired(tmin)) {
+ break;
+ }
+ }
+
+ return ev;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java
new file mode 100644
index 00000000..d2f32043
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java
@@ -0,0 +1,144 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+// TODO move to policy-utils
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.util.Properties;
+import org.onap.policy.common.utils.properties.SpecPropertyConfiguration;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+/**
+ * Checks whether or not a feature is enabled. The name of the "enable" property
+ * is assumed to be of the form accepted by a {@link SpecPropertyConfiguration},
+ * which contains a substitution place-holder into which a "specializer" (e.g.,
+ * controller or session name) is substituted.
+ */
+public class FeatureEnabledChecker {
+
+ /**
+ *
+ */
+ private FeatureEnabledChecker() {
+ super();
+ }
+
+ /**
+ * Determines if a feature is enabled for a particular specializer.
+ *
+ * @param props properties from which to extract the "enabled" flag
+ * @param specializer specializer to be substituted into the property name
+ * when extracting
+ * @param propName the name of the "enabled" property
+ * @return {@code true} if the feature is enabled, or {@code false} if it is
+ * not enabled (or if the property doesn't exist)
+ * @throws IllegalArgumentException if the "enabled" property is not a
+ * boolean value
+ */
+ public static boolean isFeatureEnabled(Properties props, String specializer, String propName) {
+
+ try {
+ return new Config(specializer, props, propName).isEnabled();
+
+ } catch (PropertyException e) {
+ throw new IllegalArgumentException("cannot check property " + propName, e);
+ }
+ }
+
+
+ /**
+ * Configuration used to extract the value.
+ */
+ private static class Config extends SpecPropertyConfiguration {
+
+ /**
+ * There is a bit of trickery here. This annotation is just a
+ * place-holder to get the superclass to invoke the
+ * {@link #setValue(java.lang.reflect.Field, Properties, Property)
+ * setValue()} method. When that's invoked, we'll substitute
+ * {@link #propOverride} instead of this annotation.
+ */
+ @Property(name = "feature-enabled-property-place-holder")
+ private boolean enabled;
+
+ /**
+ * Annotation that will actually be used to set the field.
+ */
+ private Property propOverride;
+
+ /**
+ *
+ * @param specializer specializer to be substituted into the property
+ * name when extracting
+ * @param props properties from which to extract the "enabled" flag
+ * @param propName the name of the "enabled" property
+ * @throws PropertyException if an error occurs
+ */
+ public Config(String specializer, Properties props, String propName) throws PropertyException {
+ super(specializer);
+
+ propOverride = new Property() {
+
+ @Override
+ public String name() {
+ return propName;
+ }
+
+ @Override
+ public String defaultValue() {
+ // feature is disabled by default
+ return "false";
+ }
+
+ @Override
+ public String accept() {
+ return "";
+ }
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return Property.class;
+ }
+ };
+
+ setAllFields(props);
+ }
+
+ /**
+ * Substitutes {@link #propOverride} for "prop".
+ */
+ @Override
+ protected boolean setValue(Field field, Properties props, Property prop) throws PropertyException {
+ return super.setValue(field, props, propOverride);
+ }
+
+ /**
+ *
+ * @return {@code true} if the feature is enabled, {@code false}
+ * otherwise
+ */
+ public boolean isEnabled() {
+ return enabled;
+ }
+ };
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
new file mode 100644
index 00000000..da47a031
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -0,0 +1,390 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.core.PolicySessionFeatureAPI;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
+import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.utils.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Controller/session pooling. Multiple hosts may be launched, all servicing the
+ * same controllers/sessions. When this feature is enabled, the requests are
+ * divided across the different hosts, instead of all running on a single,
+ * active host.
+ * <p>
+ * With each controller, there is an associated DMaaP topic that is used for
+ * internal communication between the different hosts serving the controller.
+ */
+public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControllerFeatureAPI, PolicySessionFeatureAPI {
+
+ private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
+
+ // TODO state-management doesn't allow more than one active host at a time
+
+ /**
+ * Factory used to create objects.
+ */
+ private static Factory factory;
+
+ /**
+ * Entire set of feature properties, including those specific to various
+ * controllers.
+ */
+ private Properties featProps = null;
+
+ /**
+ * Maps a controller name to its associated manager.
+ */
+ private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+
+ /**
+ * Arguments passed to beforeOffer(), which are saved for when the
+ * beforeInsert() is called later. As multiple threads can be active within
+ * the methods at the same time, we must keep this in thread local storage.
+ */
+ private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
+
+ /**
+ *
+ */
+ public PoolingFeature() {
+ super();
+ }
+
+ protected static Factory getFactory() {
+ return factory;
+ }
+
+ /**
+ * Sets the factory to be used to create objects. Used by junit tests.
+ *
+ * @param factory the new factory to be used to create objects
+ */
+ protected static void setFactory(Factory factory) {
+ PoolingFeature.factory = factory;
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return 0;
+ }
+
+ /**
+ * @throws PoolingFeatureRtException if the properties cannot be read or are
+ * invalid
+ */
+ @Override
+ public void globalInit(String[] args, String configDir) {
+ logger.info("initializing pooling feature");
+
+ try {
+ featProps = PropertyUtil.getProperties(configDir + "/feature-pooling-dmaap.properties");
+
+ } catch (IOException ex) {
+ throw new PoolingFeatureRtException(ex);
+ }
+ }
+
+ /**
+ * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
+ *
+ * @throws PoolingFeatureRtException if an error occurs
+ */
+ @Override
+ public boolean afterCreate(PolicyController controller) {
+
+ if (featProps == null) {
+ logger.error("pooling feature properties have not been loaded");
+ throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
+ }
+
+ String name = controller.getName();
+
+ if (FeatureEnabledChecker.isFeatureEnabled(featProps, name, PoolingProperties.FEATURE_ENABLED)) {
+ try {
+ // get & validate the properties
+ PoolingProperties props = new PoolingProperties(name, featProps);
+
+ logger.info("pooling enabled for {}", name);
+ ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props));
+
+ } catch (PropertyException e) {
+ logger.error("pooling disabled due to exception for {}", name, e);
+ throw new PoolingFeatureRtException(e);
+ }
+
+ } else {
+ logger.info("pooling disabled for {}", name);
+ }
+
+
+ return false;
+ }
+
+ @Override
+ public boolean beforeStart(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.beforeStart();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean afterStart(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.afterStart();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean beforeStop(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.beforeStop();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean afterStop(PolicyController controller) {
+
+ // NOTE: using doDeleteManager() instead of doManager()
+
+ return doDeleteManager(controller, mgr -> {
+
+ mgr.afterStop();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean beforeLock(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.beforeLock();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean afterUnlock(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.afterUnlock();
+ return false;
+ });
+ }
+
+ @Override
+ public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
+ /*
+ * As this is invoked a lot, we'll directly call the manager's method
+ * instead of using the functional interface via doManager().
+ */
+ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
+ if (mgr == null) {
+ return false;
+ }
+
+ if (mgr.beforeOffer(protocol, topic2, event)) {
+ return true;
+ }
+
+ offerArgs.set(new OfferArgs(protocol, topic2, event));
+ return false;
+ }
+
+ @Override
+ public boolean beforeInsert(DroolsController droolsController, Object fact) {
+
+ OfferArgs args = offerArgs.get();
+ if (args == null) {
+ return false;
+ }
+
+ PolicyController controller;
+ try {
+ controller = factory.getController(droolsController);
+
+ } catch (IllegalArgumentException | IllegalStateException e) {
+ return false;
+ }
+
+ if (controller == null) {
+ return false;
+ }
+
+ /*
+ * As this is invoked a lot, we'll directly call the manager's method
+ * instead of using the functional interface via doManager().
+ */
+ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
+ if (mgr == null) {
+ return false;
+ }
+
+ return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
+ }
+
+ @Override
+ public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
+ boolean success) {
+
+ // clear any stored arguments
+ offerArgs.set(null);
+
+ return false;
+ }
+
+ /**
+ * Executes a function using the manager associated with the controller.
+ * Catches any exceptions from the function and re-throws it as a runtime
+ * exception.
+ *
+ * @param controller
+ * @param func function to be executed
+ * @return {@code true} if the function handled the request, {@code false}
+ * otherwise
+ * @throws PoolingFeatureRtException if an error occurs
+ */
+ private boolean doManager(PolicyController controller, MgrFunc func) {
+ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
+ if (mgr == null) {
+ return false;
+ }
+
+ try {
+ return func.apply(mgr);
+
+ } catch (PoolingFeatureException e) {
+ throw e.toRuntimeException();
+ }
+ }
+
+ /**
+ * Executes a function using the manager associated with the controller and
+ * then deletes the manager. Catches any exceptions from the function and
+ * re-throws it as a runtime exception.
+ *
+ * @param controller
+ * @param func function to be executed
+ * @return {@code true} if the function handled the request, {@code false}
+ * otherwise
+ * @throws PoolingFeatureRtException if an error occurs
+ */
+ private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) {
+
+ // NOTE: using "remove()" instead of "get()"
+
+ PoolingManagerImpl mgr = ctlr2pool.remove(controller.getName());
+
+ if (mgr == null) {
+ return false;
+ }
+
+ return func.apply(mgr);
+ }
+
+ /**
+ * Function that operates on a manager.
+ */
+ @FunctionalInterface
+ private static interface MgrFunc {
+
+ /**
+ *
+ * @param mgr
+ * @return {@code true} if the request was handled by the manager,
+ * {@code false} otherwise
+ * @throws PoolingFeatureException
+ */
+ public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
+ }
+
+ /**
+ * Arguments captured from beforeOffer().
+ */
+ private static class OfferArgs {
+
+ /**
+ * Protocol of the receiving topic.
+ */
+ private CommInfrastructure protocol;
+
+ /**
+ * Topic on which the event was received.
+ */
+ private String topic;
+
+ /**
+ * The event text that was received on the topic.
+ */
+ private String event;
+
+ /**
+ *
+ * @param protocol
+ * @param topic
+ * @param event the actual event data received on the topic
+ */
+ public OfferArgs(CommInfrastructure protocol, String topic, String event) {
+ this.protocol = protocol;
+ this.topic = topic;
+ this.event = event;
+ }
+ }
+
+ /**
+ * Used to create objects.
+ */
+ public static class Factory {
+
+ /**
+ * Makes a pooling manager for a controller.
+ *
+ * @param controller
+ * @param props properties to use to configure the manager
+ * @return a new pooling manager
+ */
+ public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) {
+ return new PoolingManagerImpl(controller, props);
+ }
+
+ /**
+ * Gets the policy controller associated with a drools controller.
+ *
+ * @param droolsController
+ * @return the policy controller associated with a drools controller
+ */
+ public PolicyController getController(DroolsController droolsController) {
+ return PolicyController.factory.get(droolsController);
+ }
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java
new file mode 100644
index 00000000..5efd1414
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+/**
+ * Exception thrown by the pooling feature.
+ */
+public class PoolingFeatureException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public PoolingFeatureException() {
+ super();
+ }
+
+ public PoolingFeatureException(String message) {
+ super(message);
+ }
+
+ public PoolingFeatureException(Throwable cause) {
+ super(cause);
+ }
+
+ public PoolingFeatureException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PoolingFeatureException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+ /**
+ * Converts the exception to a runtime exception.
+ *
+ * @return a new runtime exception, wrapping this exception
+ */
+ public PoolingFeatureRtException toRuntimeException() {
+ return new PoolingFeatureRtException(this);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java
new file mode 100644
index 00000000..6fdb6c69
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureRtException.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+/**
+ * A runtime exception thrown by the pooling feature.
+ */
+public class PoolingFeatureRtException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public PoolingFeatureRtException() {
+ super();
+ }
+
+ public PoolingFeatureRtException(String message) {
+ super(message);
+ }
+
+ public PoolingFeatureRtException(Throwable cause) {
+ super(cause);
+ }
+
+ public PoolingFeatureRtException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PoolingFeatureRtException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
new file mode 100644
index 00000000..de08d1e1
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -0,0 +1,152 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.state.State;
+import org.onap.policy.drools.pooling.state.StateTimerTask;
+
+/**
+ * Pooling manager for a single PolicyController.
+ */
+public interface PoolingManager {
+
+ /**
+ * Gets the properties used to configure the manager.
+ *
+ * @return
+ */
+ public PoolingProperties getProperties();
+
+ /**
+ * Gets the host id.
+ *
+ * @return the host id
+ */
+ public String getHost();
+
+ /**
+ * Gets the name of the internal DMaaP topic used by this manager to
+ * communicate with its other hosts.
+ *
+ * @return the name of the internal DMaaP topic
+ */
+ public String getTopic();
+
+ /**
+ * Indicates that communication with internal DMaaP topic failed, typically
+ * due to a missed heart beat. Stops the PolicyController.
+ *
+ * @return a latch that can be used to determine when the controller's
+ * stop() method has completed
+ */
+ public CountDownLatch internalTopicFailed();
+
+ /**
+ * Starts distributing requests according to the given bucket assignments.
+ *
+ * @param assignments must <i>not</i> be {@code null}
+ */
+ public void startDistributing(BucketAssignments assignments);
+
+ /**
+ * Gets the current bucket assignments.
+ *
+ * @return the current bucket assignments, or {@code null} if no assignments
+ * have been made
+ */
+ public BucketAssignments getAssignments();
+
+ /**
+ * Publishes a message to the internal topic on the administrative channel.
+ *
+ * @param msg message to be published
+ */
+ public void publishAdmin(Message msg);
+
+ /**
+ * Publishes a message to the internal topic on a particular channel.
+ *
+ * @param channel channel on which the message should be published
+ * @param msg message to be published
+ */
+ public void publish(String channel, Message msg);
+
+ /**
+ * Handles a {@link Forward} event that was received from the internal
+ * topic.
+ *
+ * @param event
+ */
+ public void handle(Forward event);
+
+ /**
+ * Schedules a timer to fire after a delay.
+ *
+ * @param delayMs delay, in milliseconds
+ * @param task
+ * @return a future that can be used to cancel the timer
+ */
+ public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task);
+
+ /**
+ * Schedules a timer to fire repeatedly.
+ *
+ * @param initialDelayMs initial delay, in milliseconds
+ * @param delayMs delay, in milliseconds
+ * @param task
+ * @return a future that can be used to cancel the timer
+ */
+ public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task);
+
+ /**
+ * Transitions to the "start" state.
+ *
+ * @return the new state
+ */
+ public State goStart();
+
+ /**
+ * Transitions to the "query" state.
+ *
+ * @return the new state
+ */
+ public State goQuery();
+
+ /**
+ * Transitions to the "active" state.
+ *
+ * @return the new state
+ */
+ public State goActive();
+
+ /**
+ * Transitions to the "inactive" state.
+ *
+ * @return the new state
+ */
+ public State goInactive();
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
new file mode 100644
index 00000000..cd71670d
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -0,0 +1,871 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.pooling.extractor.ClassExtractors;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.state.ActiveState;
+import org.onap.policy.drools.pooling.state.IdleState;
+import org.onap.policy.drools.pooling.state.InactiveState;
+import org.onap.policy.drools.pooling.state.QueryState;
+import org.onap.policy.drools.pooling.state.StartState;
+import org.onap.policy.drools.pooling.state.State;
+import org.onap.policy.drools.pooling.state.StateTimerTask;
+import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
+import org.onap.policy.drools.system.PolicyController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+/**
+ * Implementation of a {@link PoolingManager}. Until bucket assignments have
+ * been made, events coming from external topics are saved in a queue for later
+ * processing. Once assignments are made, the saved events are processed. In
+ * addition, while the controller is locked, events are still forwarded to other
+ * hosts and bucket assignments are still updated, based on any {@link Leader}
+ * messages that it receives.
+ */
+public class PoolingManagerImpl implements PoolingManager, TopicListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
+
+ // TODO metrics, audit logging
+
+ /**
+ * Maximum number of times a message can be forwarded.
+ */
+ public static final int MAX_HOPS = 5;
+
+ /**
+ * Type of item that the extractors will be extracting.
+ */
+ private static final String EXTRACTOR_TYPE = "requestId";
+
+ /**
+ * Prefix for extractor properties.
+ */
+ private static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE;
+
+ /**
+ * Factory used to create various objects. Can be overridden during junit
+ * testing.
+ */
+ private static Factory factory = new Factory();
+
+ /**
+ * ID of this host.
+ */
+ private final String host;
+
+ /**
+ * Properties with which this was configured.
+ */
+ private final PoolingProperties props;
+
+ /**
+ * Associated controller.
+ */
+ private final PolicyController controller;
+
+ /**
+ * Where to offer events that have been forwarded to this host (i.e, the
+ * controller).
+ */
+ private final TopicListener listener;
+
+ /**
+ * Used to encode & decode request objects received from & sent to a rule
+ * engine.
+ */
+ private final Serializer serializer;
+
+ /**
+ * Internal DMaaP topic used by this controller.
+ */
+ private final String topic;
+
+ /**
+ * Manager for the internal DMaaP topic.
+ */
+ private final DmaapManager dmaapMgr;
+
+ /**
+ * Used to extract the request id from the decoded message.
+ */
+ private final ClassExtractors extractors;
+
+ /**
+ * Lock used while updating {@link #current}. In general, public methods
+ * must use this, while private methods assume the lock is already held.
+ */
+ private final Object curLocker = new Object();
+
+ /**
+ * Current state.
+ * <p>
+ * This uses a finite state machine, wherein the state object contains all
+ * of the data relevant to that state. Each state object has a process()
+ * method, specific to each type of {@link Message} subclass. The method
+ * returns the next state object, or {@code null} if the state is to remain
+ * the same.
+ */
+ private State current;
+
+ /**
+ * Current bucket assignments or {@code null}.
+ */
+ private BucketAssignments assignments = null;
+
+ /**
+ * Pool used to execute timers.
+ */
+ private ScheduledThreadPoolExecutor scheduler = null;
+
+ /**
+ * Queue used when no bucket assignments are available.
+ */
+ private EventQueue eventq;
+
+ /**
+ * {@code True} if events offered by the controller should be intercepted,
+ * {@code false} otherwise.
+ */
+ private boolean intercept = true;
+
+ /**
+ * Constructs the manager, initializing all of the data structures.
+ *
+ * @param controller controller with which this is associated
+ * @param props feature properties specific to the controller
+ */
+ public PoolingManagerImpl(PolicyController controller, PoolingProperties props) {
+ this.host = UUID.randomUUID().toString();
+ this.controller = controller;
+ this.props = props;
+
+ try {
+ this.listener = (TopicListener) controller;
+ this.serializer = new Serializer();
+ this.topic = props.getPoolingTopic();
+ this.eventq = factory.makeEventQueue(props);
+
+ SpecProperties spec = new SpecProperties(PROP_EXTRACTOR_PREFIX, controller.getName());
+ this.extractors = factory.makeClassExtractors(spec);
+
+ this.dmaapMgr = factory.makeDmaapManager(props);
+ this.current = new IdleState(this);
+
+ logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
+
+ } catch (ClassCastException e) {
+ logger.error("not a topic listener, controller {}", controller.getName());
+ throw new PoolingFeatureRtException(e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
+ throw e.toRuntimeException();
+ }
+ }
+
+ protected static Factory getFactory() {
+ return factory;
+ }
+
+ protected static void setFactory(Factory factory) {
+ PoolingManagerImpl.factory = factory;
+ }
+
+ /**
+ * Should only be used by junit tests.
+ *
+ * @return the current state
+ */
+ protected State getCurrent() {
+ synchronized (curLocker) {
+ return current;
+ }
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public PoolingProperties getProperties() {
+ return props;
+ }
+
+ /**
+ * Indicates that the controller is about to start. Starts the publisher for
+ * the internal topic, and creates a thread pool for the timers.
+ *
+ * @throws PoolingFeatureException if the internal topic publisher cannot be
+ * started
+ */
+ public void beforeStart() throws PoolingFeatureException {
+ synchronized (curLocker) {
+ if (scheduler == null) {
+ dmaapMgr.startPublisher();
+
+ scheduler = factory.makeScheduler();
+
+ /*
+ * Only a handful of timers at any moment, thus we can afford to
+ * take the time to remove them when they're cancelled.
+ */
+ scheduler.setRemoveOnCancelPolicy(true);
+ scheduler.setMaximumPoolSize(1);
+ scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ }
+ }
+ }
+
+ /**
+ * Indicates that the controller has successfully started. Starts the
+ * consumer for the internal topic, enters the {@link StartState}, and sets
+ * the filter for the initial state.
+ */
+ public void afterStart() {
+ synchronized (curLocker) {
+ if (current instanceof IdleState) {
+ dmaapMgr.startConsumer(this);
+ changeState(new StartState(this));
+ }
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to stop. Stops the consumer, the
+ * scheduler, and the current state.
+ */
+ public void beforeStop() {
+ ScheduledThreadPoolExecutor sched;
+
+ synchronized (curLocker) {
+ sched = scheduler;
+ scheduler = null;
+
+ if (!(current instanceof IdleState)) {
+ dmaapMgr.stopConsumer(this);
+ changeState(new IdleState(this));
+
+ // TODO
+ /*
+ * Need a brief delay here to allow "offline" message to be
+ * transmitted?
+ */
+ }
+ }
+
+ if (sched != null) {
+ sched.shutdownNow();
+ }
+ }
+
+ /**
+ * Indicates that the controller has stopped. Stops the publisher and logs a
+ * warning if any events are still in the queue.
+ */
+ public void afterStop() {
+ synchronized (curLocker) {
+ if (!eventq.isEmpty()) {
+ logger.warn("discarded {} messages after stopping topic {}", eventq.size(), topic);
+ eventq.clear();
+ }
+
+ dmaapMgr.stopPublisher();
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to be locked. Enters the idle
+ * state, as all it will be doing is forwarding messages.
+ */
+ public void beforeLock() {
+ synchronized (curLocker) {
+ changeState(new IdleState(this));
+ }
+ }
+
+ /**
+ * Indicates that the controller has been unlocked. Enters the start state,
+ * if the controller is running.
+ */
+ public void afterUnlock() {
+ synchronized (curLocker) {
+ if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
+ changeState(new StartState(this));
+ }
+ }
+ }
+
+ /**
+ * Changes the finite state machine to a new state, provided the new state
+ * is not {@code null}.
+ *
+ * @param newState new state, or {@code null} if to remain unchanged
+ */
+ private void changeState(State newState) {
+ if (newState != null) {
+ current.cancelTimers();
+ current = newState;
+
+ // set the filter before starting the state
+ setFilter(newState.getFilter());
+ newState.start();
+ }
+ }
+
+ /**
+ * Sets the server-side filter for the internal topic.
+ *
+ * @param filter new filter to be used
+ */
+ private void setFilter(Map<String, Object> filter) {
+ try {
+ dmaapMgr.setFilter(serializer.encodeFilter(filter));
+
+ } catch (JsonProcessingException e) {
+ logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
+ }
+ }
+
+ @Override
+ public CountDownLatch internalTopicFailed() {
+ logger.error("communication failed for topic {}", topic);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ /*
+ * We don't want to build up items in our queue if we can't forward them
+ * to other hosts, so we just stop the controller.
+ *
+ * Use a background thread to prevent deadlocks.
+ */
+ new Thread() {
+ @Override
+ public void run() {
+ controller.stop();
+ latch.countDown();
+ }
+ }.start();
+
+ return latch;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task) {
+ return scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ return scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void publishAdmin(Message msg) {
+ publish(Message.ADMIN, msg);
+ }
+
+ @Override
+ public void publish(String channel, Message msg) {
+ msg.setChannel(channel);
+
+ try {
+ // ensure it's valid before we send it
+ msg.checkValidity();
+
+ String txt = serializer.encodeMsg(msg);
+ dmaapMgr.publish(txt);
+
+ } catch (JsonProcessingException e) {
+ logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
+ }
+ }
+
+ /**
+ * Handles an event from the internal topic.
+ *
+ * @param topic2
+ * @param event
+ * @return {@code true} if the event was handled, {@code false} if the
+ * controller should handle it
+ */
+ @Override
+ public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
+
+ if (event == null) {
+ logger.error("null event on topic {}", topic);
+ return;
+ }
+
+ synchronized (curLocker) {
+ // it's on the internal topic
+ handleInternal(event);
+ }
+ }
+
+ /**
+ * Called by the PolicyController before it offers the event to the
+ * DroolsController. If the controller is locked, then it isn't processing
+ * events. However, they still need to be forwarded, thus in that case, they
+ * are decoded and forwarded.
+ * <p>
+ * On the other hand, if the controller is not locked, then we just return
+ * immediately and let {@link #beforeInsert(Object, String, String, Object)
+ * beforeInsert()} handle it instead, as it already has the decoded message.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event
+ * @return {@code true} if the event was handled by the manager,
+ * {@code false} if it must still be handled by the invoker
+ */
+ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+
+ if (!controller.isLocked() || !intercept) {
+ // we should NOT intercept this message - let the invoker handle it
+ return false;
+ }
+
+ return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
+ }
+
+ /**
+ * Called by the DroolsController before it inserts the event into the rule
+ * engine.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event original event text, as received from the Bus
+ * @param event2 event, as an object
+ * @return {@code true} if the event was handled by the manager,
+ * {@code false} if it must still be handled by the invoker
+ */
+ public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
+
+ if (!intercept) {
+ // we should NOT intercept this message - let the invoker handle it
+ return false;
+ }
+
+ return handleExternal(protocol, topic2, event, extractRequestId(event2));
+ }
+
+ /**
+ * Handles an event from an external topic.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event
+ * @param reqid request id extracted from the event, or {@code null} if it
+ * couldn't be extracted
+ * @return {@code true} if the event was handled by the manager,
+ * {@code false} if it must still be handled by the invoker
+ */
+ private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
+ if (reqid == null) {
+ // no request id - let the invoker handle it
+ return false;
+ }
+
+ if (reqid.isEmpty()) {
+ logger.warn("handle locally due to empty request id for topic {}", topic2);
+ // no request id - let the invoker handle it
+ return false;
+ }
+
+ Forward ev = makeForward(protocol, topic2, event, reqid);
+ if (ev == null) {
+ // invalid args - consume the message
+ return true;
+ }
+
+ synchronized (curLocker) {
+ return handleExternal(ev);
+ }
+ }
+
+ /**
+ * Handles an event from an external topic.
+ *
+ * @param event
+ * @return {@code true} if the event was handled, {@code false} if the
+ * invoker should handle it
+ */
+ private boolean handleExternal(Forward event) {
+ if (assignments == null) {
+ // no bucket assignments yet - add it to the queue
+ eventq.add(event);
+
+ // we've consumed the event
+ return true;
+
+ } else {
+ return handleEvent(event);
+ }
+ }
+
+ /**
+ * Handles a {@link Forward} event, possibly forwarding it again.
+ *
+ * @param event
+ * @return {@code true} if the event was handled, {@code false} if the
+ * invoker should handle it
+ */
+ private boolean handleEvent(Forward event) {
+ int bucket = Math.abs(event.getRequestId().hashCode()) % assignments.size();
+ String target = assignments.getAssignedHost(bucket);
+
+ if (target == null) {
+ /*
+ * This bucket has no assignment - just discard the event
+ */
+ return true;
+ }
+
+ if (target.equals(host)) {
+ /*
+ * Message belongs to this host - allow the controller to handle it.
+ */
+ return false;
+ }
+
+ // forward to a different host, if hop count has been exhausted
+ if (event.getNumHops() > MAX_HOPS) {
+ logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
+ topic);
+
+ } else {
+ event.bumpNumHops();
+ publish(target, event);
+ }
+
+ // either way, consume the event
+ return true;
+ }
+
+ /**
+ * Extract the request id from an event object.
+ *
+ * @param event the event object, or {@code null}
+ * @return the event's request id, or {@code null} if it can't be extracted
+ */
+ private String extractRequestId(Object event) {
+ if (event == null) {
+ return null;
+ }
+
+ Object reqid = extractors.extract(event);
+ return (reqid != null ? reqid.toString() : null);
+ }
+
+ /**
+ * Decodes an event from a String into an event Object.
+ *
+ * @param topic2
+ * @param event
+ * @return the decoded event object, or {@code null} if it can't be decoded
+ */
+ private Object decodeEvent(String topic2, String event) {
+ DroolsController drools = controller.getDrools();
+
+ // check if this topic has a decoder
+
+ if (!factory.canDecodeEvent(drools, topic2)) {
+
+ logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
+ drools.getArtifactId());
+ return null;
+ }
+
+ // decode
+
+ try {
+ return factory.decodeEvent(drools, topic2, event);
+
+ } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
+ logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
+ return null;
+ }
+ }
+
+ /**
+ * Makes a {@link Forward}, and validates its contents.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event
+ * @param reqid
+ * @return a new message, or {@code null} if the message was invalid
+ */
+ private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
+ try {
+ Forward ev = new Forward(host, protocol, topic2, event, reqid);
+
+ // required for the validity check
+ ev.setChannel(host);
+
+ ev.checkValidity();
+
+ return ev;
+
+ } catch (PoolingFeatureException e) {
+ logger.error("invalid message for topic {}", topic2, e);
+ return null;
+ }
+ }
+
+ @Override
+ public void handle(Forward event) {
+ synchronized (curLocker) {
+ if (!handleExternal(event)) {
+ // this host should handle it - inject it
+ inject(event);
+ }
+ }
+ }
+
+ /**
+ * Injects an event into the controller.
+ *
+ * @param event
+ */
+ private void inject(Forward event) {
+ intercept = false;
+ listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
+
+ intercept = true;
+ }
+
+ /**
+ * Handles an event from the internal topic. This uses reflection to
+ * identify the appropriate process() method to invoke, based on the type of
+ * Message that was decoded.
+ *
+ * @param event the serialized {@link Message} read from the internal topic
+ */
+ private void handleInternal(String event) {
+ Class<?> clazz = null;
+
+ try {
+ Message msg = serializer.decodeMsg(event);
+
+ // get the class BEFORE checking the validity
+ clazz = msg.getClass();
+
+ msg.checkValidity();
+
+ Method meth = current.getClass().getMethod("process", msg.getClass());
+ changeState((State) meth.invoke(current, msg));
+
+ } catch (IOException e) {
+ logger.warn("failed to decode message for topic {}", topic, e);
+
+ } catch (NoSuchMethodException | SecurityException e) {
+ logger.error("no processor for message {} for topic {}", clazz, topic, e);
+
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ logger.error("failed to process message {} for topic {}", clazz, topic, e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to process message {} for topic {}", clazz, topic, e);
+ }
+ }
+
+ @Override
+ public void startDistributing(BucketAssignments assignments) {
+ if (assignments == null) {
+ return;
+ }
+
+ synchronized (curLocker) {
+ this.assignments = assignments;
+
+ // now that we have assignments, we can process the queue
+ Forward ev;
+ while ((ev = eventq.poll()) != null) {
+ handle(ev);
+ }
+ }
+ }
+
+ @Override
+ public BucketAssignments getAssignments() {
+ return assignments;
+ }
+
+ @Override
+ public State goStart() {
+ return new StartState(this);
+ }
+
+ @Override
+ public State goQuery() {
+ return new QueryState(this);
+ }
+
+ @Override
+ public State goActive() {
+ return new ActiveState(this);
+ }
+
+ @Override
+ public State goInactive() {
+ return new InactiveState(this);
+ }
+
+ /**
+ * Action to run a timer task. Only runs the task if the machine is still in
+ * the state that it was in when the timer was created.
+ */
+ private class TimerAction implements Runnable {
+
+ /**
+ * State of the machine when the timer was created.
+ */
+ private State origState;
+
+ /**
+ * Task to be executed.
+ */
+ private StateTimerTask task;
+
+ /**
+ *
+ * @param task task to execute when this timer runs
+ */
+ public TimerAction(StateTimerTask task) {
+ this.origState = current;
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ synchronized (curLocker) {
+ if (current == origState) {
+ changeState(task.fire(null));
+ }
+ }
+ }
+ }
+
+ /**
+ * Factory used to create objects.
+ */
+ public static class Factory {
+
+ /**
+ * Creates an event queue.
+ *
+ * @param props properties used to configure the event queue
+ * @return a new event queue
+ */
+ public EventQueue makeEventQueue(PoolingProperties props) {
+ return new EventQueue(props.getOfflineLimit(), props.getOfflineAgeMs());
+ }
+
+ /**
+ * Creates object extractors.
+ *
+ * @param props properties used to configure the extractors
+ * @return a new set of extractors
+ */
+ public ClassExtractors makeClassExtractors(Properties props) {
+ return new ClassExtractors(props, PROP_EXTRACTOR_PREFIX, EXTRACTOR_TYPE);
+ }
+
+ /**
+ * Creates a DMaaP manager.
+ *
+ * @param props properties used to configure the manager
+ * @return a new DMaaP manager
+ * @throws PoolingFeatureException if an error occurs
+ */
+ public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException {
+ return new DmaapManager(props.getPoolingTopic(), props.getSource());
+ }
+
+ /**
+ * Creates a scheduled thread pool.
+ *
+ * @return a new scheduled thread pool
+ */
+ public ScheduledThreadPoolExecutor makeScheduler() {
+ return new ScheduledThreadPoolExecutor(1);
+ }
+
+ /**
+ * Determines if the event can be decoded.
+ *
+ * @param drools drools controller
+ * @param topic topic on which the event was received
+ * @return {@code true} if the event can be decoded, {@code false}
+ * otherwise
+ */
+ public boolean canDecodeEvent(DroolsController drools, String topic) {
+ return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic);
+ }
+
+ /**
+ * Decodes the event.
+ *
+ * @param drools drools controller
+ * @param topic topic on which the event was received
+ * @param event event text to be decoded
+ * @return the decoded event
+ * @throws IllegalArgumentException
+ * @throw UnsupportedOperationException
+ * @throws IllegalStateException
+ */
+ public Object decodeEvent(DroolsController drools, String topic, String event) {
+ return EventProtocolCoder.manager.decode(drools.getGroupId(), drools.getArtifactId(), topic, event);
+ }
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
new file mode 100644
index 00000000..1cbe5cb9
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
@@ -0,0 +1,162 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.util.Properties;
+import org.onap.policy.common.utils.properties.SpecPropertyConfiguration;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+/**
+ * Properties used by the pooling feature, specific to a controller.
+ */
+public class PoolingProperties extends SpecPropertyConfiguration {
+
+ /**
+ * Feature properties all begin with this prefix.
+ */
+ public static final String PREFIX = "pooling.";
+
+ /*
+ * These properties REQUIRE a controller name, thus they use the "{$}" form.
+ */
+ public static final String POOLING_TOPIC = PREFIX + "{$}.topic";
+
+ /*
+ * These properties allow the controller name to be left out, thus they use
+ * the "{prefix?suffix}" form.
+ */
+ public static final String FEATURE_ENABLED = PREFIX + "{?.}enabled";
+ public static final String OFFLINE_LIMIT = PREFIX + "{?.}offline.queue.limit";
+ public static final String OFFLINE_AGE_MS = PREFIX + "{?.}offline.queue.age.milliseconds";
+ public static final String START_HEARTBEAT_MS = PREFIX + "{?.}start.heartbeat.milliseconds";
+ public static final String REACTIVATE_MS = PREFIX + "{?.}reactivate.milliseconds";
+ public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds";
+ public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds";
+ public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds";
+
+ /**
+ * Properties from which this was constructed.
+ */
+ private Properties source;
+
+ /**
+ * Topic used for inter-host communication.
+ */
+ @Property(name = POOLING_TOPIC)
+ private String poolingTopic;
+
+ /**
+ * Maximum number of events to retain in the queue while waiting for
+ * buckets to be assigned.
+ */
+ @Property(name = OFFLINE_LIMIT, defaultValue = "1000")
+ private int offlineLimit;
+
+ /**
+ * Maximum age, in milliseconds, of events to be retained in the queue.
+ * Events older than this are discarded.
+ */
+ @Property(name = OFFLINE_AGE_MS, defaultValue = "60000")
+ private long offlineAgeMs;
+
+ /**
+ * Time, in milliseconds, to wait for this host's heart beat during the
+ * start-up state.
+ */
+ @Property(name = START_HEARTBEAT_MS, defaultValue = "50000")
+ private long startHeartbeatMs;
+
+ /**
+ * Time, in milliseconds, to wait before attempting to re-active this
+ * host when it has no bucket assignments.
+ */
+ @Property(name = REACTIVATE_MS, defaultValue = "50000")
+ private long reactivateMs;
+
+ /**
+ * Time, in milliseconds, to wait for all Identification messages to
+ * arrive during the query state.
+ */
+ @Property(name = IDENTIFICATION_MS, defaultValue = "50000")
+ private long identificationMs;
+
+ /**
+ * Time, in milliseconds, to wait for heart beats from this host, or its
+ * predecessor, during the active state.
+ */
+ @Property(name = ACTIVE_HEARTBEAT_MS, defaultValue = "50000")
+ private long activeHeartbeatMs;
+
+ /**
+ * Time, in milliseconds, to wait between heart beat generations during
+ * the active state.
+ */
+ @Property(name = INTER_HEARTBEAT_MS, defaultValue = "15000")
+ private long interHeartbeatMs;
+
+ /**
+ * @param controllerName the name of the controller
+ * @param props set of properties used to configure this
+ * @throws PropertyException if an error occurs
+ *
+ */
+ public PoolingProperties(String controllerName, Properties props) throws PropertyException {
+ super(controllerName, props);
+
+ source = props;
+ }
+
+ public Properties getSource() {
+ return source;
+ }
+
+ public String getPoolingTopic() {
+ return poolingTopic;
+ }
+
+ public int getOfflineLimit() {
+ return offlineLimit;
+ }
+
+ public long getOfflineAgeMs() {
+ return offlineAgeMs;
+ }
+
+ public long getStartHeartbeatMs() {
+ return startHeartbeatMs;
+ }
+
+ public long getReactivateMs() {
+ return reactivateMs;
+ }
+
+ public long getIdentificationMs() {
+ return identificationMs;
+ }
+
+ public long getActiveHeartbeatMs() {
+ return activeHeartbeatMs;
+ }
+
+ public long getInterHeartbeatMs() {
+ return interHeartbeatMs;
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
new file mode 100644
index 00000000..63aefb7a
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.io.IOException;
+import java.util.Map;
+import org.onap.policy.drools.pooling.message.Message;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Serialization helper functions.
+ */
+public class Serializer {
+
+ /**
+ * Used to encode & decode JSON messages sent & received, respectively, on
+ * the internal DMaaP topic.
+ */
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ /**
+ *
+ */
+ public Serializer() {
+ super();
+ }
+
+ /**
+ * Encodes a filter.
+ *
+ * @param filter filter to be encoded
+ * @return the filter, serialized as a JSON string
+ * @throws JsonProcessingException if it cannot be serialized
+ */
+ public String encodeFilter(Map<String, Object> filter) throws JsonProcessingException {
+ return mapper.writeValueAsString(filter);
+ }
+
+ /**
+ * Encodes a message.
+ *
+ * @param msg message to be encoded
+ * @return the message, serialized as a JSON string
+ * @throws JsonProcessingException if it cannot be serialized
+ */
+ public String encodeMsg(Message msg) throws JsonProcessingException {
+ return mapper.writeValueAsString(msg);
+ }
+
+ /**
+ * Decodes a JSON string into a Message.
+ *
+ * @param msg JSON string representing the message
+ * @return the message
+ * @throws IOException if it cannot be serialized
+ */
+ public Message decodeMsg(String msg) throws IOException {
+ return mapper.readValue(msg, Message.class);
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
new file mode 100644
index 00000000..e4557404
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.util.Properties;
+
+/**
+ * Properties with an optional specialization (e.g., session name, controller
+ * name).
+ */
+public class SpecProperties extends Properties {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The property prefix, ending with ".".
+ */
+ private final String prefix;
+
+ /**
+ * The specialized property prefix, ending with ".".
+ */
+ private final String specPrefix;
+
+ /**
+ *
+ * @param prefix the property name prefix that appears before any
+ * specialization
+ * @param specialization the property name specialization (e.g., session
+ * name)
+ */
+ public SpecProperties(String prefix, String specialization) {
+ this.prefix = withTrailingDot(prefix);
+ this.specPrefix = withTrailingDot(this.prefix + specialization);
+ }
+
+ /**
+ *
+ * @param prefix the property name prefix that appears before any
+ * specialization
+ * @param specialization the property name specialization (e.g., session
+ * name)
+ * @param props the default properties
+ */
+ public SpecProperties(String prefix, String specialization, Properties props) {
+ super(props);
+
+ this.prefix = withTrailingDot(prefix);
+ this.specPrefix = withTrailingDot(this.prefix + specialization);
+ }
+
+ /**
+ * Adds a trailing "." to a String, if it doesn't already have one.
+ *
+ * @param text text to which the "." should be added
+ * @return the text, with a trailing "."
+ */
+ private static String withTrailingDot(String text) {
+ return text.endsWith(".") ? text : text + ".";
+ }
+
+ /**
+ * Gets the property whose value has the given key, looking first for the
+ * specialized property name, and then for the generalized property name.
+ *
+ * @param key property name, without the specialization
+ * @return the value from the property set, or {@code null} if the property
+ * set does not contain the value
+ */
+ public String getProperty(String key) {
+ if (!key.startsWith(prefix)) {
+ return super.getProperty(key);
+ }
+
+ String suffix = key.substring(prefix.length());
+
+ String val = super.getProperty(specPrefix + suffix);
+ if (val != null) {
+ return val;
+ }
+
+ return super.getProperty(key);
+ }
+
+ protected String getPrefix() {
+ return prefix;
+ }
+
+ protected String getSpecPrefix() {
+ return specPrefix;
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
new file mode 100644
index 00000000..cb12a6ac
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
@@ -0,0 +1,466 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang.StringUtils;
+import org.onap.policy.drools.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extractors for each object class. Properties define how the data is to be
+ * extracted for a given class, where the properties are similar to the
+ * following:
+ *
+ * <pre>
+ * <code>&lt;a.prefix>.&lt;class.name> = ${event.reqid}</code>
+ * </pre>
+ *
+ * If it doesn't find a property for the class, then it looks for a property for
+ * that class' super class or interfaces. Extractors are compiled and cached.
+ */
+public class ClassExtractors {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClassExtractors.class);
+
+ /**
+ * Properties that specify how the data is to be extracted from a given
+ * class.
+ */
+ private final Properties properties;
+
+ /**
+ * Property prefix, including a trailing ".".
+ */
+ private final String prefix;
+
+ /**
+ * Type of item to be extracted.
+ */
+ private final String type;
+
+ /**
+ * Maps the class name to its extractor.
+ */
+ private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>();
+
+ /**
+ *
+ * @param props properties that specify how the data is to be extracted from
+ * a given class
+ * @param prefix property name prefix, prepended before the class name
+ * @param type type of item to be extracted
+ */
+ public ClassExtractors(Properties props, String prefix, String type) {
+ this.properties = props;
+ this.prefix = (prefix.endsWith(".") ? prefix : prefix + ".");
+ this.type = type;
+ }
+
+ /**
+ * Gets the number of extractors in the map.
+ *
+ * @return gets the number of extractors in the map
+ */
+ protected int size() {
+ return class2extractor.size();
+ }
+
+ /**
+ * Extracts the desired data item from an object.
+ *
+ * @param object object from which to extract the data item
+ * @return the extracted item, or {@code null} if it could not be extracted
+ */
+ public Object extract(Object object) {
+ if (object == null) {
+ return null;
+ }
+
+ Extractor ext = getExtractor(object);
+
+ return ext.extract(object);
+ }
+
+ /**
+ * Gets the extractor for the given type of object, creating one if it
+ * doesn't exist yet.
+ *
+ * @param object object whose extracted is desired
+ * @return an extractor for the object
+ */
+ private Extractor getExtractor(Object object) {
+ Class<?> clazz = object.getClass();
+ Extractor ext = class2extractor.get(clazz.getName());
+
+ if (ext == null) {
+ // allocate a new extractor, if another thread doesn't beat us to it
+ ext = class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
+ }
+
+ return ext;
+ }
+
+ /**
+ * Builds an extractor for the class.
+ *
+ * @param clazz class for which the extractor should be built
+ *
+ * @return a new extractor
+ */
+ private Extractor buildExtractor(Class<?> clazz) {
+ String value = properties.getProperty(prefix + clazz.getName(), null);
+ if (value != null) {
+ // property has config info for this class - build the extractor
+ return buildExtractor(clazz, value);
+ }
+
+ /*
+ * Get the extractor, if any, for the super class or interfaces, but
+ * don't add one if it doesn't exist
+ */
+ Extractor ext = getClassExtractor(clazz, false);
+ if (ext != null) {
+ return ext;
+ }
+
+ /*
+ * No extractor defined for for this class or its super class - we
+ * cannot extract data items from objects of this type, so just
+ * allocated a null extractor.
+ */
+ logger.warn("missing property " + prefix + clazz.getName());
+ return new NullExtractor();
+ }
+
+ /**
+ * Builds an extractor for the class, based on the config value extracted
+ * from the corresponding property.
+ *
+ * @param clazz class for which the extractor should be built
+ * @param value config value (e.g., "${event.request.id}"
+ * @return a new extractor
+ */
+ private Extractor buildExtractor(Class<?> clazz, String value) {
+ if (!value.startsWith("${")) {
+ logger.warn("property value for " + prefix + clazz.getName() + " does not start with '${'");
+ return new NullExtractor();
+ }
+
+ if (!value.endsWith("}")) {
+ logger.warn("property value for " + prefix + clazz.getName() + " does not end with '}'");
+ return new NullExtractor();
+ }
+
+ // get the part in the middle
+ String val = value.substring(2, value.length() - 1);
+ if (val.startsWith(".")) {
+ logger.warn("property value for " + prefix + clazz.getName() + " begins with '.'");
+ return new NullExtractor();
+ }
+
+ if (val.endsWith(".")) {
+ logger.warn("property value for " + prefix + clazz.getName() + " ends with '.'");
+ return new NullExtractor();
+ }
+
+ // everything's valid - create the extractor
+ try {
+ ComponetizedExtractor ext = new ComponetizedExtractor(clazz, val.split("[.]"));
+
+ /*
+ * If there's only one extractor, then just return it, otherwise
+ * return the whole extractor.
+ */
+ return (ext.extractors.length == 1 ? ext.extractors[0] : ext);
+
+ } catch (ExtractorException e) {
+ logger.warn("cannot build extractor for " + clazz.getName());
+ return new NullExtractor();
+ }
+ }
+
+ /**
+ * Gets the extractor for a class, examining all super classes and
+ * interfaces.
+ *
+ * @param clazz class whose extractor is desired
+ * @param addOk {@code true} if the extractor may be added, provided the
+ * property is defined, {@code false} otherwise
+ * @return the extractor to be used for the class, or {@code null} if no
+ * extractor has been defined yet
+ */
+ private Extractor getClassExtractor(Class<?> clazz, boolean addOk) {
+ if (clazz == null) {
+ return null;
+ }
+
+ Extractor ext = null;
+
+ if (addOk) {
+ String val = properties.getProperty(prefix + clazz.getName(), null);
+
+ if (val != null) {
+ /*
+ * A property is defined for this class, so create the extractor
+ * for it.
+ */
+ return class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
+ }
+ }
+
+ // see if the superclass has an extractor
+ if ((ext = getClassExtractor(clazz.getSuperclass(), true)) != null) {
+ return ext;
+ }
+
+ // check the interfaces, too
+ for (Class<?> clz : clazz.getInterfaces()) {
+ if ((ext = getClassExtractor(clz, true)) != null) {
+ break;
+ }
+ }
+
+ return ext;
+ }
+
+ /**
+ * Extractor that always returns {@code null}. Used when no extractor could
+ * be built for a given object type.
+ */
+ private class NullExtractor implements Extractor {
+
+ @Override
+ public Object extract(Object object) {
+ logger.warn("cannot extract " + type + " from " + object.getClass());
+ return null;
+ }
+ }
+
+ /**
+ * Component-ized extractor. Extracts an object that is referenced
+ * hierarchically, where each name identifies a particular component within
+ * the hierarchy. Supports retrieval from {@link Map} objects, as well as
+ * via getXxx() methods, or by direct field retrieval.
+ * <p>
+ * Note: this will <i>not</i> work if POJOs are contained within a Map.
+ */
+ private class ComponetizedExtractor implements Extractor {
+
+ /**
+ * Extractor for each component.
+ */
+ private final Extractor[] extractors;
+
+ /**
+ *
+ * @param clazz the class associated with the object at the root of the
+ * hierarchy
+ * @param names name associated with each component
+ * @throws ExtractorException
+ */
+ public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException {
+ this.extractors = new Extractor[names.length];
+
+ Class<?> clz = clazz;
+
+ for (int x = 0; x < names.length; ++x) {
+ String comp = names[x];
+
+ Pair<Extractor, Class<?>> pair = buildExtractor(clz, comp);
+
+ extractors[x] = pair.first();
+ clz = pair.second();
+ }
+ }
+
+ /**
+ * Builds an extractor for the given component of an object.
+ *
+ * @param clazz type of object from which the component will be
+ * extracted
+ * @param comp name of the component to extract
+ * @return a pair containing the extractor and the extracted object's
+ * type
+ * @throws ExtractorException
+ */
+ private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException {
+ Pair<Extractor, Class<?>> pair = null;
+
+ if (pair == null) {
+ pair = getMethodExtractor(clazz, comp);
+ }
+
+ if (pair == null) {
+ pair = getFieldExtractor(clazz, comp);
+ }
+
+ if (pair == null) {
+ pair = getMapExtractor(clazz, comp);
+ }
+
+
+ // didn't find an extractor
+ if (pair == null) {
+ throw new ExtractorException("class " + clazz + " contains no element " + comp);
+ }
+
+ return pair;
+ }
+
+ @Override
+ public Object extract(Object object) {
+ Object obj = object;
+
+ for (Extractor ext : extractors) {
+ if (obj == null) {
+ break;
+ }
+
+ obj = ext.extract(obj);
+ }
+
+ return obj;
+ }
+
+ /**
+ * Gets an extractor that invokes a getXxx() method to retrieve the
+ * object.
+ *
+ * @param clazz container's class
+ * @param name name of the property to be retrieved
+ * @return a new extractor, or {@code null} if the class does not
+ * contain the corresponding getXxx() method
+ * @throws ExtractorException if the getXxx() method is inaccessible
+ */
+ private Pair<Extractor, Class<?>> getMethodExtractor(Class<?> clazz, String name) throws ExtractorException {
+ Method meth;
+
+ String nm = "get" + StringUtils.capitalize(name);
+
+ try {
+ meth = clazz.getMethod(nm);
+
+ Class<?> retType = meth.getReturnType();
+ if (retType == void.class) {
+ // it's a void method, thus it won't return an object
+ return null;
+ }
+
+ return new Pair<>(new MethodExtractor(meth), retType);
+
+ } catch (NoSuchMethodException expected) {
+ // no getXxx() method, maybe there's a field by this name
+ return null;
+
+ } catch (SecurityException e) {
+ throw new ExtractorException("inaccessible method " + clazz + "." + nm, e);
+ }
+ }
+
+ /**
+ * Gets an extractor for a field within the object.
+ *
+ * @param clazz container's class
+ * @param name name of the field whose value is to be extracted
+ * @return a new extractor, or {@code null} if the class does not
+ * contain the given field
+ * @throws ExtractorException if the field is inaccessible
+ */
+ private Pair<Extractor, Class<?>> getFieldExtractor(Class<?> clazz, String name) throws ExtractorException {
+
+ Field field = getClassField(clazz, name);
+ if (field == null) {
+ return null;
+ }
+
+ return new Pair<>(new FieldExtractor(field), field.getType());
+ }
+
+ /**
+ * Gets an extractor for an item within a Map object.
+ *
+ * @param clazz container's class
+ * @param key item key within the map
+ * @return a new extractor, or {@code null} if the class is not a Map
+ * subclass
+ * @throws ExtractorException
+ */
+ private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) throws ExtractorException {
+
+ if (!Map.class.isAssignableFrom(clazz)) {
+ return null;
+ }
+
+ /*
+ * Don't know the value's actual type, so we'll assume it's a Map
+ * for now. Things should still work OK, as this is only used to
+ * direct the constructor on what type of extractor to create next.
+ * If the object turns out not to be a map, then the MapExtractor
+ * for the next component will just return null.
+ */
+ return new Pair<>(new MapExtractor(key), Map.class);
+ }
+
+ /**
+ * Gets field within a class, examining all super classes and
+ * interfaces.
+ *
+ * @param clazz class whose field is desired
+ * @param name name of the desired field
+ * @return the field within the class, or {@code null} if the field does
+ * not exist
+ * @throws ExtractorException if the field is inaccessible
+ */
+ private Field getClassField(Class<?> clazz, String name) throws ExtractorException {
+ if (clazz == null) {
+ return null;
+ }
+
+ try {
+ return clazz.getDeclaredField(name);
+
+ } catch (NoSuchFieldException expected) {
+ // no field by this name - try super class & interfaces
+
+ } catch (SecurityException e) {
+ throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
+ }
+
+
+ Field field;
+
+ // see if the superclass has an extractor
+ if ((field = getClassField(clazz.getSuperclass(), name)) != null) {
+ return field;
+ }
+
+ // not necessary to check the interfaces
+
+ return field;
+ }
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
new file mode 100644
index 00000000..9ed32ae4
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+/**
+ * Used to extract an object contained within another object.
+ */
+public interface Extractor {
+
+ /**
+ * Extracts an object contained within another object.
+ *
+ * @param object object from which to extract the contained object
+ * @return the extracted value, or {@code null} if it cannot be extracted
+ */
+ public Object extract(Object object);
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
new file mode 100644
index 00000000..a864672b
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
@@ -0,0 +1,49 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+/**
+ * Exception generated by extractors.
+ */
+public class ExtractorException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ExtractorException() {
+ super();
+ }
+
+ public ExtractorException(String message) {
+ super(message);
+ }
+
+ public ExtractorException(Throwable cause) {
+ super(cause);
+ }
+
+ public ExtractorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ExtractorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
new file mode 100644
index 00000000..132b8ed0
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+import java.lang.reflect.Field;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to extract an object stored in one of the container's fields.
+ */
+public class FieldExtractor implements Extractor {
+
+ private static final Logger logger = LoggerFactory.getLogger(FieldExtractor.class);
+
+ /**
+ * Field containing the object.
+ */
+ private final Field field;
+
+ /**
+ *
+ * @param field field containing the object
+ */
+ public FieldExtractor(Field field) {
+ this.field = field;
+
+ field.setAccessible(true);
+ }
+
+ @Override
+ public Object extract(Object object) {
+ try {
+ return field.get(object);
+
+ } catch (IllegalAccessException | IllegalArgumentException e) {
+ logger.warn("cannot get {} from {}", field.getName(), object.getClass(), e);
+ return null;
+ }
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
new file mode 100644
index 00000000..aff9d860
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to extract an object stored in a map.
+ */
+public class MapExtractor implements Extractor {
+
+ private static final Logger logger = LoggerFactory.getLogger(MapExtractor.class);
+
+ /**
+ * Key to the item to extract from the map.
+ */
+ private final String key;
+
+ /**
+ *
+ * @param key key to the item to extract from the map
+ */
+ public MapExtractor(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public Object extract(Object object) {
+
+ if (object instanceof Map) {
+ Map<?, ?> map = (Map<?, ?>) object;
+
+ return map.get(key);
+
+ } else {
+ logger.warn("expecting a map, instead of " + object.getClass());
+ return null;
+ }
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
new file mode 100644
index 00000000..20c4a1a7
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.extractor;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to extract an object by invoking a method on the container.
+ */
+public class MethodExtractor implements Extractor {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodExtractor.class);
+
+ /**
+ * Method to invoke to extract the contained object.
+ */
+ private final Method method;
+
+ /**
+ *
+ * @param method method to invoke to extract the contained object
+ */
+ public MethodExtractor(Method method) {
+ this.method = method;
+ }
+
+ @Override
+ public Object extract(Object object) {
+ try {
+ return method.invoke(object);
+
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ logger.warn("cannot invoke {} on {}", method.getName(), object.getClass(), e);
+ return null;
+ }
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
new file mode 100644
index 00000000..8fd86c1e
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
@@ -0,0 +1,215 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Bucket assignments, which is simply an array of host names.
+ */
+public class BucketAssignments {
+
+ @JsonIgnore
+ private static final Logger logger = LoggerFactory.getLogger(BucketAssignments.class);
+
+ /**
+ * Number of buckets.
+ */
+ public static final int MAX_BUCKETS = 1024;
+
+ /**
+ * Identifies the host serving a particular bucket.
+ */
+ private String[] hostArray = null;
+
+ /**
+ *
+ */
+ public BucketAssignments() {
+ super();
+ }
+
+ /**
+ *
+ * @param hostArray maps a bucket number (i.e., array index) to a host. All
+ * values must be non-null
+ */
+ public BucketAssignments(String[] hostArray) {
+ this.hostArray = hostArray;
+ }
+
+ public String[] getHostArray() {
+ return hostArray;
+ }
+
+ public void setHostArray(String[] hostArray) {
+ this.hostArray = hostArray;
+ }
+
+ /**
+ * Gets the leader, which is the host with the minimum UUID.
+ *
+ * @return the assignment leader
+ */
+ @JsonIgnore
+ public String getLeader() {
+ if (hostArray == null) {
+ return null;
+ }
+
+ String leader = null;
+
+ for (String host : hostArray) {
+ if (host != null && (leader == null || host.compareTo(leader) < 0)) {
+ leader = host;
+ }
+ }
+
+ return leader;
+
+ }
+
+ /**
+ * Determines if a host has an assignment.
+ *
+ * @param host host to be checked
+ * @return {@code true} if the host has an assignment, {@code false}
+ * otherwise
+ */
+ @JsonIgnore
+ public boolean hasAssignment(String host) {
+ if (hostArray == null) {
+ return false;
+ }
+
+ for (String host2 : hostArray) {
+ if (host.equals(host2)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Gets all of the hosts that have an assignment.
+ *
+ * @return all of the hosts that have an assignment
+ */
+ @JsonIgnore
+ public Set<String> getAllHosts() {
+ Set<String> set = new HashSet<>();
+ if (hostArray == null) {
+ return set;
+ }
+
+ for (String host : hostArray) {
+ if (host != null) {
+ set.add(host);
+ }
+ }
+
+ return set;
+ }
+
+ /**
+ * Gets the host assigned to a given bucket.
+ *
+ * @param bucket bucket number
+ * @return the assigned host, or {@code null} if the bucket has no assigned
+ * host
+ */
+ @JsonIgnore
+ public String getAssignedHost(int bucket) {
+ if (hostArray == null) {
+ logger.error("no buckets have been assigned");
+ return null;
+ }
+
+ if (bucket < 0 || bucket >= hostArray.length) {
+ logger.error("invalid bucket number {} maximum {}", bucket, hostArray.length);
+ return null;
+ }
+
+ return hostArray[bucket];
+ }
+
+ /**
+ * Gets the number of buckets.
+ *
+ * @return the number of buckets
+ */
+ @JsonIgnore
+ public int size() {
+ return (hostArray != null ? hostArray.length : 0);
+ }
+
+ /**
+ * Checks the validity of the assignments, verifying that all buckets have
+ * been assigned to a host.
+ *
+ * @throws PoolingFeatureException if the assignments are invalid
+ */
+ @JsonIgnore
+ public void checkValidity() throws PoolingFeatureException {
+ if (hostArray == null || hostArray.length == 0) {
+ throw new PoolingFeatureException("missing hosts in message bucket assignments");
+ }
+
+ if (hostArray.length > MAX_BUCKETS) {
+ throw new PoolingFeatureException("too many hosts in message bucket assignments");
+ }
+
+ for (int x = 0; x < hostArray.length; ++x) {
+ if (hostArray[x] == null) {
+ throw new PoolingFeatureException("bucket " + x + " has no assignment");
+ }
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(hostArray);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ BucketAssignments other = (BucketAssignments) obj;
+ if (!Arrays.equals(hostArray, other.hostArray))
+ return false;
+ return true;
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
new file mode 100644
index 00000000..b59cfbb2
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
@@ -0,0 +1,180 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Message to forward an event to another host.
+ */
+public class Forward extends Message {
+
+ /**
+ * Number of hops (i.e., number of times it's been forwarded) so far.
+ */
+ private int numHops;
+
+ /**
+ * Time, in milliseconds, at which the message was created.
+ */
+ private long createTimeMs;
+
+ /**
+ * Protocol of the receiving topic.
+ */
+ private CommInfrastructure protocol;
+
+ /**
+ * Topic on which the event was received.
+ */
+ private String topic;
+
+ /**
+ * The event pay load that was received on the topic.
+ */
+ private String payload;
+
+ /**
+ * The request id that was extracted from the event.
+ */
+ private String requestId;
+
+ /**
+ *
+ */
+ public Forward() {
+ super();
+ }
+
+ /**
+ *
+ * @param source host on which the message originated
+ * @param protocol
+ * @param topic
+ * @param payload the actual event data received on the topic
+ * @param requestId
+ */
+ public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) {
+ super(source);
+
+ this.numHops = 0;
+ this.createTimeMs = System.currentTimeMillis();
+ this.protocol = protocol;
+ this.topic = topic;
+ this.payload = payload;
+ this.requestId = requestId;
+ }
+
+ /**
+ * Increments {@link #numHops}.
+ */
+ public void bumpNumHops() {
+ ++numHops;
+ }
+
+ public int getNumHops() {
+ return numHops;
+ }
+
+ public void setNumHops(int numHops) {
+ this.numHops = numHops;
+ }
+
+ public long getCreateTimeMs() {
+ return createTimeMs;
+ }
+
+ public void setCreateTimeMs(long createTimeMs) {
+ this.createTimeMs = createTimeMs;
+ }
+
+ public CommInfrastructure getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(CommInfrastructure protocol) {
+ this.protocol = protocol;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ public void setPayload(String payload) {
+ this.payload = payload;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ @JsonIgnore
+ public boolean isExpired(long minCreateTimeMs) {
+ return (createTimeMs < minCreateTimeMs);
+
+ }
+
+ @JsonIgnore
+ public void checkValidity() throws PoolingFeatureException {
+
+ super.checkValidity();
+
+ if (protocol == null) {
+ throw new PoolingFeatureException("missing message protocol");
+ }
+
+ if (topic == null || topic.isEmpty()) {
+ throw new PoolingFeatureException("missing message topic");
+ }
+
+ /*
+ * Note: an empty pay load is OK, as an empty message could have been
+ * received on the topic.
+ */
+
+ if (payload == null) {
+ throw new PoolingFeatureException("missing message payload");
+ }
+
+ if (requestId == null || requestId.isEmpty()) {
+ throw new PoolingFeatureException("missing message requestId");
+ }
+
+ if (numHops < 0) {
+ throw new PoolingFeatureException("invalid message hop count");
+ }
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java
new file mode 100644
index 00000000..2a63a5be
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+/**
+ * Heart beat message sent to self, or to the succeeding host.
+ */
+public class Heartbeat extends Message {
+
+ /**
+ * Time, in milliseconds, when this was created.
+ */
+ private long timestampMs;
+
+ /**
+ *
+ */
+ public Heartbeat() {
+ super();
+
+ }
+
+ /**
+ *
+ * @param source host on which the message originated
+ * @param timestampMs time, in milliseconds, associated with the message
+ */
+ public Heartbeat(String source, long timestampMs) {
+ super(source);
+
+ this.timestampMs = timestampMs;
+ }
+
+ public long getTimestampMs() {
+ return timestampMs;
+ }
+
+ public void setTimestampMs(long timestampMs) {
+ this.timestampMs = timestampMs;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java
new file mode 100644
index 00000000..5de6b8f9
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+/**
+ * Identifies the source host and the bucket assignments which it knows about.
+ */
+public class Identification extends MessageWithAssignments {
+
+ /**
+ *
+ */
+ public Identification() {
+ super();
+
+ }
+
+ /**
+ *
+ * @param source host on which the message originated
+ * @param assignments
+ */
+ public Identification(String source, BucketAssignments assignments) {
+ super(source, assignments);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
new file mode 100644
index 00000000..0fc48c3c
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Indicates that the "source" of this message is now the "lead" host.
+ */
+public class Leader extends MessageWithAssignments {
+
+ /**
+ *
+ */
+ public Leader() {
+ super();
+ }
+
+ /**
+ *
+ * @param source host on which the message originated
+ * @param assignments
+ */
+ public Leader(String source, BucketAssignments assignments) {
+ super(source, assignments);
+ }
+
+ /**
+ * Also verifies that buckets have been assigned and that the source is
+ * indeed the leader.
+ */
+ @Override
+ @JsonIgnore
+ public void checkValidity() throws PoolingFeatureException {
+
+ super.checkValidity();
+
+ BucketAssignments assignments = getAssignments();
+ if (assignments == null) {
+ throw new PoolingFeatureException("missing message bucket assignments");
+ }
+
+ String leader = getSource();
+
+ if(!assignments.hasAssignment(leader)) {
+ throw new PoolingFeatureException("leader " + leader + " has no bucket assignments");
+ }
+
+ for (String host : assignments.getHostArray()) {
+ if (host.compareTo(leader) < 0) {
+ throw new PoolingFeatureException("invalid leader " + leader + ", should be " + host);
+ }
+ }
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java
new file mode 100644
index 00000000..e8a4671d
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * Messages sent on the internal topic.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes({@Type(value = Forward.class, name = "forward"), @Type(value = Heartbeat.class, name = "heartbeat"),
+ @Type(value = Identification.class, name = "identification"),
+ @Type(value = Leader.class, name = "leader"), @Type(value = Offline.class, name = "offline"),
+ @Type(value = Query.class, name = "query")})
+public class Message {
+
+ /**
+ * Name of the administrative channel.
+ */
+ public static final String ADMIN = "_admin";
+
+ /**
+ * Host that originated the message.
+ */
+ private String source;
+
+ /**
+ * Channel on which the message is routed, which is either the target host
+ * or {@link #ADMIN}.
+ */
+ private String channel;
+
+ /**
+ *
+ */
+ public Message() {
+ super();
+ }
+
+ /**
+ *
+ * @param source host on which the message originated
+ */
+ public Message(String source) {
+ this.source = source;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public String getChannel() {
+ return channel;
+ }
+
+ public void setChannel(String channel) {
+ this.channel = channel;
+ }
+
+ /**
+ * Checks the validity of the message, including verifying that required
+ * fields are not missing.
+ *
+ * @throws PoolingFeatureException if the message is invalid
+ */
+ @JsonIgnore
+ public void checkValidity() throws PoolingFeatureException {
+ if (source == null || source.isEmpty()) {
+ throw new PoolingFeatureException("missing message source");
+ }
+
+ if (channel == null || channel.isEmpty()) {
+ throw new PoolingFeatureException("missing message channel");
+ }
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
new file mode 100644
index 00000000..9fded815
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+import org.onap.policy.drools.pooling.PoolingFeatureException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * A Message that includes bucket assignments.
+ */
+public class MessageWithAssignments extends Message {
+
+ /**
+ * Bucket assignments, as known by the source host.
+ */
+ private BucketAssignments assignments;
+
+ /**
+ *
+ */
+ public MessageWithAssignments() {
+ super();
+ }
+
+ /**
+ *
+ * @param source host on which the message originated
+ * @param assignments
+ */
+ public MessageWithAssignments(String source, BucketAssignments assignments) {
+ super(source);
+
+ this.assignments = assignments;
+
+ }
+
+ public BucketAssignments getAssignments() {
+ return assignments;
+ }
+
+ public void setAssignments(BucketAssignments assignments) {
+ this.assignments = assignments;
+ }
+
+ /**
+ * If there are any assignments, it verifies there validity.
+ */
+ @Override
+ @JsonIgnore
+ public void checkValidity() throws PoolingFeatureException {
+
+ super.checkValidity();
+
+ if (assignments != null) {
+ assignments.checkValidity();
+ }
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java
new file mode 100644
index 00000000..297671ac
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+/**
+ * Indicates that the source host is going offline and will be unable to process
+ * any further requests.
+ */
+public class Offline extends Message {
+
+ /**
+ *
+ */
+ public Offline() {
+ super();
+
+ }
+
+ /**
+ *
+ * @param source host on which the message originated
+ */
+ public Offline(String source) {
+ super(source);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java
new file mode 100644
index 00000000..c995a288
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.message;
+
+/**
+ * Query the other hosts for their identification.
+ */
+public class Query extends Message {
+
+ /**
+ *
+ */
+ public Query() {
+ super();
+
+ }
+
+ /**
+ *
+ * @param source host on which the message originated
+ */
+ public Query(String source) {
+ super(source);
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
new file mode 100644
index 00000000..5f503a3b
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
@@ -0,0 +1,255 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.Arrays;
+import java.util.TreeSet;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+/**
+ * The active state. In this state, this host has one more more bucket
+ * assignments and processes any events associated with one of its buckets.
+ * Other events are forwarded to appropriate target hosts.
+ */
+public class ActiveState extends ProcessingState {
+
+ /**
+ * Set of hosts that have been assigned a bucket.
+ */
+ private final TreeSet<String> assigned = new TreeSet<>();
+
+ /**
+ * Host that comes after this host, or {@code null} if it has no successor.
+ */
+ private String succHost = null;
+
+ /**
+ * Host that comes before this host, or "" if it has no predecessor.
+ */
+ private String predHost = "";
+
+ /**
+ * {@code True} if we saw this host's heart beat since the last check,
+ * {@code false} otherwise.
+ */
+ private boolean myHeartbeatSeen = false;
+
+ /**
+ * {@code True} if we saw the predecessor's heart beat since the last check,
+ * {@code false} otherwise.
+ */
+ private boolean predHeartbeatSeen = false;
+
+ /**
+ *
+ * @param mgr
+ */
+ public ActiveState(PoolingManager mgr) {
+ super(mgr, mgr.getAssignments().getLeader());
+
+ assigned.addAll(Arrays.asList(mgr.getAssignments().getHostArray()));
+
+ detmNeighbors();
+ }
+
+ /**
+ * Determine this host's neighbors based on the order of the host UUIDs.
+ * Updates {@link #succHost} and {@link #predHost}.
+ */
+ private void detmNeighbors() {
+ if (assigned.size() < 2) {
+ /*
+ * this host is the only one with any assignments - it has no
+ * neighbors
+ */
+ succHost = null;
+ predHost = "";
+ return;
+ }
+
+ if ((succHost = assigned.higher(getHost())) == null) {
+ // wrapped around - successor is the first host in the set
+ succHost = assigned.first();
+ }
+
+ if ((predHost = assigned.lower(getHost())) == null) {
+ // wrapped around - predecessor is the last host in the set
+ predHost = assigned.last();
+ }
+ }
+
+ @Override
+ public void start() {
+ addTimers();
+ genHeartbeat();
+ }
+
+ /**
+ * Adds the timers.
+ */
+ private void addTimers() {
+
+ /*
+ * heart beat generator
+ */
+ long genMs = getProperties().getActiveHeartbeatMs();
+
+ scheduleWithFixedDelay(genMs, genMs, xxx -> {
+ genHeartbeat();
+ return null;
+ });
+
+ /*
+ * my heart beat checker
+ */
+ long interMs = getProperties().getInterHeartbeatMs();
+
+ scheduleWithFixedDelay(interMs, interMs, xxx -> {
+ if (myHeartbeatSeen) {
+ myHeartbeatSeen = false;
+ return null;
+ }
+
+ // missed my heart beat
+
+ return internalTopicFailed();
+ });
+
+ /*
+ * predecessor heart beat checker
+ */
+ if (!predHost.isEmpty()) {
+
+ scheduleWithFixedDelay(interMs, interMs, xxx -> {
+ if (predHeartbeatSeen) {
+ predHeartbeatSeen = false;
+ return null;
+ }
+
+ // missed the predecessor's heart beat
+ publish(makeQuery());
+
+ return goQuery();
+ });
+ }
+ }
+
+ /**
+ * Generates a heart beat for this host and its successor.
+ */
+ private void genHeartbeat() {
+ Heartbeat msg = makeHeartbeat(System.currentTimeMillis());
+ publish(getHost(), msg);
+
+ if (succHost != null) {
+ publish(succHost, msg);
+ }
+ }
+
+ @Override
+ public State process(Heartbeat msg) {
+ String src = msg.getSource();
+
+ if (src == null) {
+ return null;
+
+ } else if (src.equals(getHost())) {
+ myHeartbeatSeen = true;
+
+ } else if (src.equals(predHost)) {
+ predHeartbeatSeen = true;
+
+ }
+
+ return null;
+ }
+
+ @Override
+ public State process(Offline msg) {
+ String src = msg.getSource();
+
+ if (src == null) {
+ return null;
+
+ } else if (!assigned.contains(src)) {
+ /*
+ * the offline host wasn't assigned any buckets, so just ignore the
+ * message
+ */
+ return null;
+
+ } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) {
+ /*
+ * Case 1: We are the leader.
+ *
+ * Case 2: Our predecessor was the leader and it has gone offline -
+ * we should become the leader.
+ *
+ * In either case, we are now the leader and we must re-balance the
+ * buckets since one of the hosts has gone offline.
+ */
+
+ assigned.remove(src);
+
+ return becomeLeader(assigned);
+
+ } else {
+ /*
+ * Otherwise, we don't care right now - we'll wait for the leader to
+ * tell us it's been removed.
+ */
+ return null;
+ }
+ }
+
+ /**
+ * Transitions to the query state.
+ */
+ @Override
+ public State process(Query msg) {
+ State next = super.process(msg);
+ if (next != null) {
+ return next;
+ }
+
+ return goQuery();
+ }
+
+ protected String getSuccHost() {
+ return succHost;
+ }
+
+ protected String getPredHost() {
+ return predHost;
+ }
+
+ protected boolean isMyHeartbeatSeen() {
+ return myHeartbeatSeen;
+ }
+
+ protected boolean isPredHeartbeatSeen() {
+ return predHeartbeatSeen;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
new file mode 100644
index 00000000..a2da0ea2
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
@@ -0,0 +1,96 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Filter Utilities. These methods create <i>TreeMap</i> objects, because they
+ * should only contain a small number of items.
+ */
+public class FilterUtils {
+ // message element names
+ public static final String MSG_CHANNEL = "channel";
+ public static final String MSG_TIMESTAMP = "timestampMs";
+
+ // json element names
+ protected static final String JSON_CLASS = "class";
+ protected static final String JSON_FILTERS = "filters";
+ protected static final String JSON_FIELD = "field";
+ protected static final String JSON_VALUE = "value";
+
+ // values to be stuck into the "class" element
+ protected static final String CLASS_OR = "Or";
+ protected static final String CLASS_AND = "And";
+ protected static final String CLASS_EQUALS = "Equals";
+
+ /**
+ *
+ */
+ private FilterUtils() {
+ super();
+ }
+
+ /**
+ * Makes a filter that verifies that a field equals a value.
+ *
+ * @param field name of the field to check
+ * @param value desired value
+ * @return a map representing an "equals" filter
+ */
+ public static Map<String, Object> makeEquals(String field, String value) {
+ Map<String, Object> map = new TreeMap<>();
+ map.put(JSON_CLASS, CLASS_EQUALS);
+ map.put(JSON_FIELD, field);
+ map.put(JSON_VALUE, value);
+
+ return map;
+ }
+
+ /**
+ * Makes an "and" filter, where all of the items must be true.
+ *
+ * @param items items to be checked
+ * @return an "and" filter
+ */
+ public static Map<String, Object> makeAnd(@SuppressWarnings("unchecked") Map<String, Object>... items) {
+ Map<String, Object> map = new TreeMap<>();
+ map.put(JSON_CLASS, CLASS_AND);
+ map.put(JSON_FILTERS, items);
+
+ return map;
+ }
+
+ /**
+ * Makes an "or" filter, where at least one of the items must be true.
+ *
+ * @param items items to be checked
+ * @return an "or" filter
+ */
+ public static Map<String, Object> makeOr(@SuppressWarnings("unchecked") Map<String, Object>... items) {
+ Map<String, Object> map = new TreeMap<>();
+ map.put(JSON_CLASS, CLASS_OR);
+ map.put(JSON_FILTERS, items);
+
+ return map;
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java
new file mode 100644
index 00000000..27678360
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+/**
+ * Idle state, used when offline.
+ */
+public class IdleState extends State {
+
+ public IdleState(PoolingManager mgr) {
+ super(mgr);
+ }
+
+ @Override
+ public void stop() {
+ // do nothing - don't even send of "offline" message
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Heartbeat msg) {
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Identification msg) {
+ return null;
+ }
+
+ /**
+ * Copies the assignments, but doesn't change states.
+ */
+ @Override
+ public State process(Leader msg) {
+ super.process(msg);
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Offline msg) {
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Query msg) {
+ return null;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
new file mode 100644
index 00000000..1c8e4dcc
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -0,0 +1,55 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import org.onap.policy.drools.pooling.PoolingManager;
+
+/**
+ * The inactive state. In this state, we just wait a bit and then try to
+ * re-activate. In the meantime, all messages are ignored.
+ */
+public class InactiveState extends State {
+
+ /**
+ *
+ * @param mgr
+ */
+ public InactiveState(PoolingManager mgr) {
+ super(mgr);
+ }
+
+ @Override
+ public void start() {
+
+ super.start();
+
+ schedule(getProperties().getReactivateMs(), xxx -> goStart());
+ }
+
+ /**
+ * Remains in this state.
+ */
+ @Override
+ protected State goInactive() {
+ return null;
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
new file mode 100644
index 00000000..2f830c66
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
@@ -0,0 +1,410 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Any state in which events are being processed locally and forwarded, as
+ * appropriate.
+ */
+public class ProcessingState extends State {
+
+ private static final Logger logger = LoggerFactory.getLogger(ProcessingState.class);
+
+ /**
+ * Current known leader, never {@code null}.
+ */
+ private String leader;
+
+ /**
+ *
+ * @param mgr
+ * @param leader current known leader, which need not be the same as the
+ * assignment leader. Never {@code null}
+ * @throws IllegalArgumentException if an argument is invalid
+ */
+ public ProcessingState(PoolingManager mgr, String leader) {
+ super(mgr);
+
+ if (leader == null) {
+ throw new IllegalArgumentException("null leader");
+ }
+
+ BucketAssignments assignments = mgr.getAssignments();
+
+ if (assignments != null) {
+ String[] arr = assignments.getHostArray();
+ if (arr != null && arr.length == 0) {
+ throw new IllegalArgumentException("zero-length bucket assignments");
+ }
+ }
+
+ this.leader = leader;
+ }
+
+ /**
+ * Generates an Identification message and returns {@code null}.
+ */
+ @Override
+ public State process(Query msg) {
+ publish(makeIdentification());
+ return goQuery();
+ }
+
+ /**
+ * Makes an Identification message.
+ *
+ * @return a new message
+ */
+ protected Identification makeIdentification() {
+ return new Identification(getHost(), getAssignments());
+ }
+
+ /**
+ * Sets the assignments.
+ *
+ * @param assignments new assignments, or {@code null}
+ */
+ protected final void setAssignments(BucketAssignments assignments) {
+ if (assignments != null) {
+ startDistributing(assignments);
+ }
+ }
+
+ public String getLeader() {
+ return leader;
+ }
+
+ /**
+ * Sets the leader.
+ *
+ * @param leader the new leader
+ * @throws IllegalArgumentException if an argument is invalid
+ */
+ protected void setLeader(String leader) {
+ if (leader == null) {
+ throw new IllegalArgumentException("null leader");
+ }
+
+ this.leader = leader;
+ }
+
+ /**
+ * Determines if this host is the leader, based on the current assignments.
+ *
+ * @return {@code true} if this host is the leader, {@code false} otherwise
+ */
+ public boolean isLeader() {
+ return getHost().equals(leader);
+ }
+
+ /**
+ * Becomes the leader. Publishes a Leader message and enters the
+ * {@link ActiveState}.
+ *
+ * @param alive hosts that are known to be alive
+ *
+ * @return the new state
+ */
+ protected State becomeLeader(SortedSet<String> alive) {
+ String leader = getHost();
+
+ if (!leader.equals(alive.first())) {
+ throw new IllegalArgumentException(leader + " cannot replace " + alive.first());
+ }
+
+ Leader msg = makeLeader(alive);
+ publish(msg);
+
+ setAssignments(msg.getAssignments());
+
+ return goActive();
+ }
+
+ /**
+ * Makes a leader message. Assumes "this" host is the leader, and thus
+ * appears as the first host in the set of hosts that are still alive.
+ *
+ * @param alive hosts that are known to be alive
+ *
+ * @return a new message
+ */
+ private Leader makeLeader(Set<String> alive) {
+ return new Leader(getHost(), makeAssignments(alive));
+ }
+
+ /**
+ * Makes a set of bucket assignments. Assumes "this" host is the leader.
+ *
+ * @param alive hosts that are known to be alive
+ *
+ * @return a new set of bucket assignments
+ */
+ private BucketAssignments makeAssignments(Set<String> alive) {
+
+ // make a working array from the CURRENT assignments
+ String[] bucket2host = makeBucketArray();
+
+ TreeSet<String> avail = new TreeSet<>(alive);
+
+ // if we have more hosts than buckets, then remove the extra hosts
+ removeExcessHosts(bucket2host.length, avail);
+
+ // create a host bucket for each available host
+ Map<String, HostBucket> host2hb = new HashMap<>();
+ avail.forEach(host -> host2hb.put(host, new HostBucket(host)));
+
+ // add bucket indices to the appropriate host bucket
+ addIndicesToHostBuckets(bucket2host, host2hb);
+
+ // convert the collection back to an array
+ fillArray(host2hb.values(), bucket2host);
+
+ // update bucket2host with new assignments
+ rebalanceBuckets(host2hb.values(), bucket2host);
+
+ return new BucketAssignments(bucket2host);
+ }
+
+ /**
+ * Makes a bucket array, copying the current assignments, if available.
+ *
+ * @return a new bucket array
+ */
+ private String[] makeBucketArray() {
+ BucketAssignments asgn = getAssignments();
+ if (asgn == null) {
+ return new String[BucketAssignments.MAX_BUCKETS];
+ }
+
+ String[] oldArray = asgn.getHostArray();
+ if (oldArray.length == 0) {
+ return new String[BucketAssignments.MAX_BUCKETS];
+ }
+
+ String[] newArray = new String[oldArray.length];
+ System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
+
+ return newArray;
+ }
+
+ /**
+ * Removes excess hosts from the set of available hosts. Assumes "this" host
+ * is the leader, and thus appears as the first host in the set.
+ *
+ * @param maxHosts maximum number of hosts to be retained
+ * @param avail available hosts
+ */
+ private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
+ while (avail.size() > maxHosts) {
+ /*
+ * Don't remove this host, as it's the leader. Since the leader is
+ * always at the front of the sorted set, we'll just pick off hosts
+ * from the back of the set.
+ */
+ String host = avail.last();
+ avail.remove(host);
+
+ logger.warn("not using extra host {} for topic {}", host, getTopic());
+ }
+ }
+
+ /**
+ * Adds bucket indices to {@link HostBucket} objects. Buckets that are
+ * unassigned or assigned to a host that does not appear within the map are
+ * re-assigned to a host that appears within the map.
+ *
+ * @param bucket2host bucket assignments
+ * @param host2data maps a host name to its {@link HostBucket}
+ */
+ private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
+ LinkedList<Integer> nullBuckets = new LinkedList<Integer>();
+
+ for (int x = 0; x < bucket2host.length; ++x) {
+ String host = bucket2host[x];
+ if (host == null) {
+ nullBuckets.add(x);
+
+ } else {
+ HostBucket hb = host2data.get(host);
+ if (hb == null) {
+ nullBuckets.add(x);
+
+ } else {
+ hb.add(x);
+ }
+ }
+ }
+
+ // assign the null buckets to other hosts
+ assignNullBuckets(nullBuckets, host2data.values());
+ }
+
+ /**
+ * Assigns null buckets (i.e., those having no assignment) to available
+ * hosts.
+ *
+ * @param buckets available hosts
+ * @param coll collection of current host-bucket assignments
+ */
+ private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
+ // assign null buckets to the hosts with the fewest buckets
+ TreeSet<HostBucket> assignments = new TreeSet<>(coll);
+
+ for (Integer index : buckets) {
+ // add it to the host with the shortest bucket list
+ HostBucket newhb = assignments.pollFirst();
+ newhb.add(index);
+
+ // put the item back into the queue, with its new count
+ assignments.add(newhb);
+ }
+ }
+
+ /**
+ * Re-balances the buckets, taking from those that have a larger count and
+ * giving to those that have a smaller count. Populates an output array with
+ * the new assignments.
+ *
+ * @param coll current bucket assignment
+ * @param bucket2host array to be populated with the new assignments
+ */
+ private void rebalanceBuckets(Collection<HostBucket> coll, String[] bucket2host) {
+ if (coll.size() <= 1) {
+ // only one hosts - nothing to rebalance
+ return;
+ }
+
+ TreeSet<HostBucket> assignments = new TreeSet<>(coll);
+
+ for (;;) {
+ HostBucket smaller = assignments.pollFirst();
+ HostBucket larger = assignments.pollLast();
+
+ if (larger.size() - smaller.size() <= 1) {
+ // it's as balanced as it will get
+ break;
+ }
+
+ // move the bucket from the larger to the smaller
+ Integer b = larger.remove();
+ smaller.add(b);
+
+ bucket2host[b] = smaller.host;
+
+ // put the items back, with their new counts
+ assignments.add(larger);
+ assignments.add(smaller);
+ }
+
+ }
+
+ /**
+ * Fills the array with the host assignments.
+ *
+ * @param coll the host assignments
+ * @param bucket2host array to be filled
+ */
+ private void fillArray(Collection<HostBucket> coll, String[] bucket2host) {
+ for (HostBucket hb : coll) {
+ for (Integer index : hb.buckets) {
+ bucket2host[index] = hb.host;
+ }
+ }
+ }
+
+ /**
+ * Tracks buckets that have been assigned to a host.
+ */
+ public static class HostBucket implements Comparable<HostBucket> {
+ /**
+ * Host to which the buckets have been assigned.
+ */
+ private String host;
+
+ /**
+ * Buckets that have been assigned to this host.
+ */
+ private Queue<Integer> buckets = new LinkedList<>();
+
+ /**
+ *
+ * @param host
+ */
+ public HostBucket(String host) {
+ this.host = host;
+ }
+
+ /**
+ * Removes the next bucket from the list.
+ *
+ * @return the next bucket
+ */
+ public final Integer remove() {
+ return buckets.remove();
+ }
+
+ /**
+ * Adds a bucket to the list.
+ *
+ * @param index index of the bucket to add
+ */
+ public final void add(Integer index) {
+ buckets.add(index);
+ }
+
+ /**
+ *
+ * @return the number of buckets assigned to this host
+ */
+ public final int size() {
+ return buckets.size();
+ }
+
+ /**
+ * Compares host buckets, first by the number of buckets, and then by
+ * the host name.
+ */
+ @Override
+ public final int compareTo(HostBucket other) {
+ int d = buckets.size() - other.buckets.size();
+ if (d == 0) {
+ d = host.compareTo(other.host);
+ }
+ return d;
+ }
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
new file mode 100644
index 00000000..57521960
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
@@ -0,0 +1,209 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import java.util.TreeSet;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Offline;
+
+// TODO add logging
+
+/**
+ * The Query state. In this state, the host waits for the other hosts to
+ * identify themselves. Eventually, a leader should come forth. If not, it will
+ * transition to the active or inactive state, depending on whether or not it
+ * has an assignment in the current bucket assignments. The other possibility is
+ * that it may <i>become</i> the leader, in which case it will also transition
+ * to the active state.
+ */
+public class QueryState extends ProcessingState {
+
+ /**
+ * Hosts that have sent an "Identification" message. Always includes this
+ * host.
+ */
+ private TreeSet<String> alive = new TreeSet<>();
+
+ /**
+ *
+ * @param mgr
+ */
+ public QueryState(PoolingManager mgr) {
+ // this host is the leader, until a better candidate identifies itself
+ super(mgr, mgr.getHost());
+
+ alive.add(getHost());
+ }
+
+ @Override
+ public void start() {
+
+ super.start();
+
+ // start identification timer
+ awaitIdentification();
+ }
+
+ /**
+ * Starts a timer to wait for all Identification messages to arrive.
+ */
+ private void awaitIdentification() {
+
+ /*
+ * Once we've waited long enough for all Identification messages to
+ * arrive, become the leader, assuming we should.
+ */
+
+ schedule(getProperties().getIdentificationMs(), xxx -> {
+
+ if (isLeader()) {
+ // "this" host is the new leader
+ return becomeLeader(alive);
+
+ } else if (hasAssignment()) {
+ /*
+ * this host is not the new leader, but it does have an
+ * assignment - return to the active state while we wait for the
+ * leader
+ */
+ return goActive();
+
+ } else {
+ // not the leader and no assignment yet
+ return goInactive();
+ }
+ });
+ }
+
+ /**
+ * Remains in this state.
+ */
+ @Override
+ public State goQuery() {
+ return null;
+ }
+
+ /**
+ * Determines if this host has an assignment in the CURRENT assignments.
+ *
+ * @return {@code true} if this host has an assignment, {@code false}
+ * otherwise
+ */
+ protected boolean hasAssignment() {
+ BucketAssignments asgn = getAssignments();
+ return (asgn != null && asgn.hasAssignment(getHost()));
+ }
+
+ @Override
+ public State process(Identification msg) {
+
+ recordInfo(msg.getSource(), msg.getAssignments());
+
+ return null;
+ }
+
+ /**
+ * If the message leader is better than the leader we have, then go active
+ * with it. Otherwise, simply treat it like an {@link Identification}
+ * message.
+ */
+ @Override
+ public State process(Leader msg) {
+ BucketAssignments asgn = msg.getAssignments();
+ if (asgn == null) {
+ return null;
+ }
+
+ // ignore Leader messages from ourself
+ String source = msg.getSource();
+ if (source == null || source.equals(getHost())) {
+ return null;
+ }
+
+ // the new leader must equal the source
+ if (!source.equals(asgn.getLeader())) {
+ return null;
+ }
+
+ // go active, if this has a better leader than the one we have
+ if (source.compareTo(getLeader()) < 0) {
+ return super.process(msg);
+ }
+
+ /*
+ * The message does not have an acceptable leader, but we'll still
+ * record its info.
+ */
+ recordInfo(msg.getSource(), msg.getAssignments());
+
+ return null;
+ }
+
+ /**
+ * Records info from a message, adding the source host name to
+ * {@link #alive}, and updating the bucket assignments.
+ *
+ * @param source the message's source host
+ * @param assignments assignments, or {@code null}
+ */
+ private void recordInfo(String source, BucketAssignments assignments) {
+ // add this message's source host to "alive"
+ if (source != null) {
+ alive.add(source);
+ setLeader(alive.first());
+ }
+
+ if (assignments == null || assignments.getLeader() == null) {
+ return;
+ }
+
+ // record assignments, if we don't have any yet
+ BucketAssignments current = getAssignments();
+ if (current == null) {
+ setAssignments(assignments);
+ return;
+ }
+
+ /*
+ * Record assignments, if the new assignments have a better (i.e.,
+ * lesser) leader.
+ */
+ String curldr = current.getLeader();
+ if (curldr == null || curldr.compareTo(assignments.getLeader()) > 0) {
+ setAssignments(assignments);
+ }
+ }
+
+ @Override
+ public State process(Offline msg) {
+ String host = msg.getSource();
+
+ if (host != null && !host.equals(getHost())) {
+ alive.remove(host);
+ setLeader(alive.first());
+ }
+
+ return null;
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
new file mode 100644
index 00000000..decbdfda
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -0,0 +1,132 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
+import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
+import java.util.Map;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+/**
+ * The start state. Upon entry, a heart beat is generated and the event filter
+ * is changed to look for just that particular message. Once the message is
+ * seen, it goes into the {@link QueryState}.
+ */
+public class StartState extends State {
+
+ /**
+ * Time stamp inserted into the heart beat message.
+ */
+ private long hbTimestampMs = System.currentTimeMillis();
+
+ /**
+ *
+ * @param mgr
+ */
+ public StartState(PoolingManager mgr) {
+ super(mgr);
+ }
+
+ /**
+ *
+ * @return the time stamp inserted into the heart beat message
+ */
+ public long getHbTimestampMs() {
+ return hbTimestampMs;
+ }
+
+ @Override
+ public void start() {
+
+ super.start();
+
+ publish(getHost(), makeHeartbeat(hbTimestampMs));
+
+ schedule(getProperties().getStartHeartbeatMs(), xxx -> internalTopicFailed());
+ }
+
+ /**
+ * Transitions to the query state if the heart beat originated from this
+ * host and its time stamp matches.
+ */
+ @Override
+ public State process(Heartbeat msg) {
+ if (msg.getTimestampMs() == hbTimestampMs && getHost().equals(msg.getSource())) {
+ // saw our own heart beat - transition to query state
+ publish(makeQuery());
+ return goQuery();
+ }
+
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Identification msg) {
+ return null;
+ }
+
+ /**
+ * Processes the assignments, but remains in the current state.
+ */
+ @Override
+ public State process(Leader msg) {
+ super.process(msg);
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Offline msg) {
+ return null;
+ }
+
+ /**
+ * Discards the message.
+ */
+ @Override
+ public State process(Query msg) {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, Object> getFilter() {
+ // ignore everything except our most recent heart beat message
+ return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeAnd(makeEquals(MSG_CHANNEL, getHost()),
+ makeEquals(MSG_TIMESTAMP, String.valueOf(hbTimestampMs))));
+
+ }
+
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
new file mode 100644
index 00000000..1e3a907e
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -0,0 +1,370 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
+import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.PoolingProperties;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
+
+/**
+ * A state in the finite state machine.
+ * <p>
+ * A state may have several timers associated with it, which must be cancelled
+ * whenever the state is changed. Assumes that timers are not continuously added
+ * to the same state.
+ */
+public abstract class State {
+
+ /**
+ * Host pool manager.
+ */
+ private final PoolingManager mgr;
+
+ /**
+ * Timers added by this state.
+ */
+ private final List<ScheduledFuture<?>> timers = new LinkedList<>();
+
+ /**
+ *
+ * @param mgr
+ */
+ public State(PoolingManager mgr) {
+ this.mgr = mgr;
+ }
+
+ /**
+ * Gets the server-side filter to use when polling the DMaaP internal topic.
+ * The default method returns a filter that accepts messages on the admin
+ * channel and on the host's own channel.
+ *
+ * @return the server-side filter to use.
+ */
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getFilter() {
+ return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
+ }
+
+ /**
+ * Cancels the timers added by this state.
+ */
+ public void cancelTimers() {
+ for (ScheduledFuture<?> fut : timers) {
+ fut.cancel(false);
+ }
+ }
+
+ /**
+ * Starts the state.
+ */
+ public void start() {
+
+ }
+
+ /**
+ * Indicates that the finite state machine is stopping. Sends an "offline"
+ * message to the other hosts.
+ */
+ public void stop() {
+ publish(makeOffline());
+ }
+
+ /**
+ * Transitions to the "start" state.
+ *
+ * @return the new state
+ */
+ public State goStart() {
+ return mgr.goStart();
+ }
+
+ /**
+ * Transitions to the "query" state.
+ *
+ * @return the new state
+ */
+ public State goQuery() {
+ return mgr.goQuery();
+ }
+
+ /**
+ * Transitions to the "active" state.
+ *
+ * @return the new state
+ */
+ public State goActive() {
+ return mgr.goActive();
+ }
+
+ /**
+ * Transitions to the "inactive" state.
+ *
+ * @return the new state
+ */
+ protected State goInactive() {
+ return mgr.goInactive();
+ }
+
+ /**
+ * Processes a message. The default method passes it to the manager to
+ * handle and returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Forward msg) {
+ mgr.handle(msg);
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Heartbeat msg) {
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Identification msg) {
+ return null;
+ }
+
+ /**
+ * Processes a message. If this host has a new assignment, then it
+ * transitions to the active state. Otherwise, it transitions to the
+ * inactive state.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Leader msg) {
+ BucketAssignments asgn = msg.getAssignments();
+ if (asgn == null) {
+ return null;
+ }
+
+ String source = msg.getSource();
+ if (source == null) {
+ return null;
+ }
+
+ // the new leader must equal the source
+ if (source.equals(asgn.getLeader())) {
+ startDistributing(asgn);
+
+ if (asgn.hasAssignment(getHost())) {
+ return goActive();
+
+ } else {
+ return goInactive();
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Offline msg) {
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Query msg) {
+ return null;
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected void publish(Identification msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected void publish(Leader msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected void publish(Offline msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message.
+ *
+ * @param msg message to be published
+ */
+ protected void publish(Query msg) {
+ mgr.publishAdmin(msg);
+ }
+
+ /**
+ * Publishes a message on the specified channel.
+ *
+ * @param channel
+ * @param msg message to be published
+ */
+ protected void publish(String channel, Forward msg) {
+ mgr.publish(channel, msg);
+ }
+
+ /**
+ * Publishes a message on the specified channel.
+ *
+ * @param channel
+ * @param msg message to be published
+ */
+ protected void publish(String channel, Heartbeat msg) {
+ mgr.publish(channel, msg);
+ }
+
+ /**
+ * Starts distributing messages using the specified bucket assignments.
+ *
+ * @param assignments
+ */
+ protected void startDistributing(BucketAssignments assignments) {
+ if (assignments != null) {
+ mgr.startDistributing(assignments);
+ }
+ }
+
+ /**
+ * Schedules a timer to fire after a delay.
+ *
+ * @param delayMs
+ * @param task
+ */
+ protected void schedule(long delayMs, StateTimerTask task) {
+ timers.add(mgr.schedule(delayMs, task));
+ }
+
+ /**
+ * Schedules a timer to fire repeatedly.
+ *
+ * @param initialDelayMs
+ * @param delayMs
+ * @param task
+ */
+ protected void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
+ }
+
+ /**
+ * Indicates that the internal topic failed.
+ *
+ * @return a new {@link InactiveState}
+ */
+ protected State internalTopicFailed() {
+ publish(makeOffline());
+ mgr.internalTopicFailed();
+
+ return mgr.goInactive();
+ }
+
+ /**
+ * Makes a heart beat message.
+ *
+ * @param timestampMs time, in milliseconds, associated with the message
+ *
+ * @return a new message
+ */
+ protected Heartbeat makeHeartbeat(long timestampMs) {
+ return new Heartbeat(getHost(), timestampMs);
+ }
+
+ /**
+ * Makes an "offline" message.
+ *
+ * @return a new message
+ */
+ protected Offline makeOffline() {
+ return new Offline(getHost());
+ }
+
+ /**
+ * Makes a query message.
+ *
+ * @return a new message
+ */
+ protected Query makeQuery() {
+ return new Query(getHost());
+ }
+
+ public final BucketAssignments getAssignments() {
+ return mgr.getAssignments();
+ }
+
+ public final String getHost() {
+ return mgr.getHost();
+ }
+
+ public final String getTopic() {
+ return mgr.getTopic();
+ }
+
+ public final PoolingProperties getProperties() {
+ return mgr.getProperties();
+ }
+}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java
new file mode 100644
index 00000000..bd388b4e
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling.state;
+
+/**
+ * Task to be executed when a timer fires within a {@link State}.
+ */
+@FunctionalInterface
+public interface StateTimerTask {
+
+ /**
+ * Fires the timer.
+ *
+ * @param arg always {@code null}
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State fire(Void arg);
+
+}