summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java4
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java46
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java6
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java62
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java2
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java6
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java19
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java1
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java1
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java1
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java14
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java9
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java3
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java5
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java9
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java5
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java7
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java3
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java3
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java3
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java2
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java3
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java21
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java3
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java5
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java68
26 files changed, 175 insertions, 136 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 278e7fdc..27b81504 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -259,6 +259,8 @@ public class DmaapManager {
public static class Factory {
/**
+ * Get topic source.
+ *
* @return the topic sources
*/
public List<TopicSource> getTopicSources() {
@@ -266,6 +268,8 @@ public class DmaapManager {
}
/**
+ * Get topic sinks.
+ *
* @return the topic sinks
*/
public List<TopicSink> getTopicSinks() {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index ad6a1c56..d45bded7 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -46,7 +46,9 @@ import org.slf4j.LoggerFactory;
/**
* Controller/session pooling. Multiple hosts may be launched, all servicing the same
* controllers/sessions. When this feature is enabled, the requests are divided across the different
- * hosts, instead of all running on a single, active host. <p> With each controller, there is an
+ * hosts, instead of all running on a single, active host.
+ *
+ * <p>With each controller, there is an
* associated DMaaP topic that is used for internal communication between the different hosts
* serving the controller.
*/
@@ -87,7 +89,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
/**
- *
+ * Constructor.
*/
public PoolingFeature() {
super();
@@ -113,6 +115,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
}
/**
+ * Get active latch.
+ *
* @return a latch that will be decremented when a manager enters the active state
*/
protected CountDownLatch getActiveLatch() {
@@ -126,7 +130,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
@Override
public boolean beforeStart(PolicyEngine engine) {
- logger.info("initializing " + PoolingProperties.FEATURE_NAME);
+ logger.info("initializing {}", PoolingProperties.FEATURE_NAME);
featProps = factory.getProperties(PoolingProperties.FEATURE_NAME);
// remove any generic pooling topic - always use controller-specific property
@@ -138,6 +142,14 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
return false;
}
+ @Override
+ public boolean beforeStart(PolicyController controller) {
+ return doManager(controller, mgr -> {
+ mgr.beforeStart();
+ return false;
+ });
+ }
+
/**
* Adds the controller and a new pooling manager to {@link #ctlr2pool}.
*
@@ -177,14 +189,6 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
}
@Override
- public boolean beforeStart(PolicyController controller) {
- return doManager(controller, mgr -> {
- mgr.beforeStart();
- return false;
- });
- }
-
- @Override
public boolean afterStart(PolicyController controller) {
return doManager(controller, mgr -> {
mgr.afterStart();
@@ -307,7 +311,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* Executes a function using the manager associated with the controller. Catches any exceptions
* from the function and re-throws it as a runtime exception.
*
- * @param controller
+ * @param controller controller
* @param func function to be executed
* @return {@code true} if the function handled the request, {@code false} otherwise
* @throws PoolingFeatureRtException if an error occurs
@@ -329,7 +333,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
* Deletes the manager associated with a controller.
*
- * @param controller
+ * @param controller controller
* @throws PoolingFeatureRtException if an error occurs
*/
private void deleteManager(PolicyController controller) {
@@ -347,10 +351,11 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
private static interface MgrFunc {
/**
+ * Apply.
*
- * @param mgr
+ * @param mgr manager
* @return {@code true} if the request was handled by the manager, {@code false} otherwise
- * @throws PoolingFeatureException
+ * @throws PoolingFeatureException feature exception
*/
public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
}
@@ -376,9 +381,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
private String event;
/**
+ * Constructor.
*
- * @param protocol
- * @param topic
+ * @param protocol protocol
+ * @param topic topic
* @param event the actual event data received on the topic
*/
public OfferArgs(CommInfrastructure protocol, String topic, String event) {
@@ -394,6 +400,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
public static class Factory {
/**
+ * Get properties.
+ *
* @param featName feature name
* @return the properties for the specified feature
*/
@@ -405,7 +413,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* Makes a pooling manager for a controller.
*
* @param host name/uuid of this host
- * @param controller
+ * @param controller controller
* @param props properties to use to configure the manager
* @param activeLatch decremented when the manager goes Active
* @return a new pooling manager
@@ -418,7 +426,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
* Gets the policy controller associated with a drools controller.
*
- * @param droolsController
+ * @param droolsController drools controller
* @return the policy controller associated with a drools controller
*/
public PolicyController getController(DroolsController droolsController) {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index c25dc12d..94956d6b 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -86,7 +86,7 @@ public interface PoolingManager {
/**
* Handles a {@link Forward} event that was received from the internal topic.
*
- * @param event
+ * @param event event
*/
public void handle(Forward event);
@@ -94,7 +94,7 @@ public interface PoolingManager {
* Schedules a timer to fire after a delay.
*
* @param delayMs delay, in milliseconds
- * @param task
+ * @param task task
* @return a new scheduled task
*/
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task);
@@ -104,7 +104,7 @@ public interface PoolingManager {
*
* @param initialDelayMs initial delay, in milliseconds
* @param delayMs delay, in milliseconds
- * @param task
+ * @param task task
* @return a new scheduled task
*/
public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 02ba4ec9..17d520ad 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -20,6 +20,7 @@
package org.onap.policy.drools.pooling;
+import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -29,10 +30,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.onap.policy.common.utils.properties.SpecProperties;
-import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.utils.properties.SpecProperties;
+import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Forward;
@@ -50,7 +51,6 @@ import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
import org.onap.policy.drools.system.PolicyController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonProcessingException;
/**
* Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
@@ -126,8 +126,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Current state.
- * <p>
- * This uses a finite state machine, wherein the state object contains all of the data
+ *
+ * <p>This uses a finite state machine, wherein the state object contains all of the data
* relevant to that state. Each state object has a process() method, specific to each
* type of {@link Message} subclass. The method returns the next state object, or
* {@code null} if the state is to remain the same.
@@ -417,10 +417,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Handles an event from the internal topic.
*
- * @param topic2
- * @param event
- * @return {@code true} if the event was handled, {@code false} if the controller
- * should handle it
+ * @param commType comm infrastructure
+ * @param topic2 topic
+ * @param event event
*/
@Override
public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
@@ -440,14 +439,14 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* Called by the PolicyController before it offers the event to the DroolsController.
* If the controller is locked, then it isn't processing events. However, they still
* need to be forwarded, thus in that case, they are decoded and forwarded.
- * <p>
- * On the other hand, if the controller is not locked, then we just return immediately
+ *
+ * <p>On the other hand, if the controller is not locked, then we just return immediately
* and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
* it instead, as it already has the decoded message.
*
- * @param protocol
- * @param topic2
- * @param event
+ * @param protocol protocol
+ * @param topic2 topic
+ * @param event event
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
@@ -464,8 +463,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Called by the DroolsController before it inserts the event into the rule engine.
*
- * @param protocol
- * @param topic2
+ * @param protocol protocol
+ * @param topic2 topic
* @param event original event text, as received from the Bus
* @param event2 event, as an object
* @return {@code true} if the event was handled by the manager, {@code false} if it
@@ -484,9 +483,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Handles an event from an external topic.
*
- * @param protocol
- * @param topic2
- * @param event
+ * @param protocol protocol
+ * @param topic2 topic
+ * @param event event
* @param reqid request id extracted from the event, or {@code null} if it couldn't be
* extracted
* @return {@code true} if the event was handled by the manager, {@code false} if it
@@ -519,7 +518,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Handles an event from an external topic.
*
- * @param event
+ * @param event event
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
@@ -539,7 +538,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Handles a {@link Forward} event, possibly forwarding it again.
*
- * @param event
+ * @param event event
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
@@ -595,8 +594,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Decodes an event from a String into an event Object.
*
- * @param topic2
- * @param event
+ * @param topic2 topic
+ * @param event event
* @return the decoded event object, or {@code null} if it can't be decoded
*/
private Object decodeEvent(String topic2, String event) {
@@ -625,10 +624,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Makes a {@link Forward}, and validates its contents.
*
- * @param protocol
- * @param topic2
- * @param event
- * @param reqid
+ * @param protocol protocol
+ * @param topic2 topic
+ * @param event event
+ * @param reqid request id
* @return a new message, or {@code null} if the message was invalid
*/
private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
@@ -661,7 +660,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Injects an event into the controller.
*
- * @param event
+ * @param event event
*/
private void inject(Forward event) {
logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
@@ -760,6 +759,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private StateTimerTask task;
/**
+ * Constructor.
*
* @param task task to execute when this timer runs
*/
@@ -832,9 +832,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* @param topic topic on which the event was received
* @param event event text to be decoded
* @return the decoded event
- * @throws IllegalArgumentException
- * @throw UnsupportedOperationException
- * @throws IllegalStateException
+ * @throws IllegalArgumentException illegal argument
+ * @throw UnsupportedOperationException unsupported operation
+ * @throws IllegalStateException illegal state
*/
public Object decodeEvent(DroolsController drools, String topic, String event) {
return EventProtocolCoder.manager.decode(drools.getGroupId(), drools.getArtifactId(), topic, event);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
index 1482366f..795bd29d 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
@@ -129,6 +129,8 @@ public class PoolingProperties extends PropertyConfiguration {
private long interHeartbeatMs;
/**
+ * Constructor.
+ *
* @param controllerName the name of the controller
* @param props set of properties used to configure this
* @throws PropertyException if an error occurs
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
index 63aefb7a..b37c33b0 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
@@ -20,11 +20,11 @@
package org.onap.policy.drools.pooling;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import org.onap.policy.drools.pooling.message.Message;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Serialization helper functions.
@@ -38,7 +38,7 @@ public class Serializer {
private final ObjectMapper mapper = new ObjectMapper();
/**
- *
+ * Constructor.
*/
public Serializer() {
super();
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
index 97e96337..a4d1fb1b 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
@@ -70,6 +70,7 @@ public class ClassExtractors {
private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>();
/**
+ * Constructor.
*
* @param props properties that specify how the data is to be extracted from
* a given class
@@ -267,8 +268,8 @@ public class ClassExtractors {
* hierarchically, where each name identifies a particular component within
* the hierarchy. Supports retrieval from {@link Map} objects, as well as
* via getXxx() methods, or by direct field retrieval.
- * <p>
- * Note: this will <i>not</i> work if POJOs are contained within a Map.
+ *
+ * <p>Note: this will <i>not</i> work if POJOs are contained within a Map.
*/
private class ComponetizedExtractor implements Extractor {
@@ -278,11 +279,12 @@ public class ClassExtractors {
private final Extractor[] extractors;
/**
+ * Constructor.
*
* @param clazz the class associated with the object at the root of the
* hierarchy
* @param names name associated with each component
- * @throws ExtractorException
+ * @throws ExtractorException extractor exception
*/
public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException {
this.extractors = new Extractor[names.length];
@@ -307,15 +309,12 @@ public class ClassExtractors {
* @param comp name of the component to extract
* @return a pair containing the extractor and the extracted object's
* type
- * @throws ExtractorException
+ * @throws ExtractorException extrator exception
*/
private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException {
- Pair<Extractor, Class<?>> pair = null;
-
- if (pair == null) {
- pair = getMethodExtractor(clazz, comp);
- }
-
+
+ Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp);
+
if (pair == null) {
pair = getFieldExtractor(clazz, comp);
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
index d394795d..9389ab22 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
@@ -37,6 +37,7 @@ public class FieldExtractor implements Extractor {
private final Field field;
/**
+ * Constructor.
*
* @param field field containing the object
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
index 032ea47e..9c5be5ff 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
@@ -37,6 +37,7 @@ public class MapExtractor implements Extractor {
private final String key;
/**
+ * Constructor.
*
* @param key key to the item to extract from the map
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
index 20c4a1a7..3efef5ec 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
@@ -38,6 +38,7 @@ public class MethodExtractor implements Extractor {
private final Method method;
/**
+ * Constructor.
*
* @param method method to invoke to extract the contained object
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
index ee871cbd..b5b64693 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
@@ -20,13 +20,13 @@
package org.onap.policy.drools.pooling.message;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.onap.policy.drools.pooling.PoolingFeatureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Bucket assignments, which is simply an array of host names.
@@ -57,13 +57,14 @@ public class BucketAssignments {
private String[] hostArray = null;
/**
- *
+ * Constructor.
*/
public BucketAssignments() {
super();
}
/**
+ * Constructor.
*
* @param hostArray maps a bucket number (i.e., array index) to a host. All values
* must be non-null
@@ -204,12 +205,15 @@ public class BucketAssignments {
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
BucketAssignments other = (BucketAssignments) obj;
return Arrays.equals(hostArray, other.hostArray);
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
index fb3d4eb2..d4037e90 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
@@ -20,9 +20,9 @@
package org.onap.policy.drools.pooling.message;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.pooling.PoolingFeatureException;
-import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Message to forward an event to another host.
@@ -67,12 +67,13 @@ public class Forward extends Message {
}
/**
+ * Constructor.
*
* @param source host on which the message originated
- * @param protocol
- * @param topic
+ * @param protocol protocol
+ * @param topic topic
* @param payload the actual event data received on the topic
- * @param requestId
+ * @param requestId request id
*/
public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) {
super(source);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java
index 2a63a5be..50a34138 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java
@@ -31,7 +31,7 @@ public class Heartbeat extends Message {
private long timestampMs;
/**
- *
+ * Constructor.
*/
public Heartbeat() {
super();
@@ -39,6 +39,7 @@ public class Heartbeat extends Message {
}
/**
+ * Constructor.
*
* @param source host on which the message originated
* @param timestampMs time, in milliseconds, associated with the message
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java
index 5de6b8f9..6bb88363 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java
@@ -26,7 +26,7 @@ package org.onap.policy.drools.pooling.message;
public class Identification extends MessageWithAssignments {
/**
- *
+ * Constructor.
*/
public Identification() {
super();
@@ -34,9 +34,10 @@ public class Identification extends MessageWithAssignments {
}
/**
+ * Constructor.
*
* @param source host on which the message originated
- * @param assignments
+ * @param assignments assignments
*/
public Identification(String source, BucketAssignments assignments) {
super(source, assignments);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
index 0fc48c3c..7464a531 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
@@ -20,8 +20,8 @@
package org.onap.policy.drools.pooling.message;
-import org.onap.policy.drools.pooling.PoolingFeatureException;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
/**
* Indicates that the "source" of this message is now the "lead" host.
@@ -29,16 +29,17 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
public class Leader extends MessageWithAssignments {
/**
- *
+ * Constructor.
*/
public Leader() {
super();
}
/**
+ * Constructor.
*
* @param source host on which the message originated
- * @param assignments
+ * @param assignments assignments
*/
public Leader(String source, BucketAssignments assignments) {
super(source, assignments);
@@ -61,7 +62,7 @@ public class Leader extends MessageWithAssignments {
String leader = getSource();
- if(!assignments.hasAssignment(leader)) {
+ if (!assignments.hasAssignment(leader)) {
throw new PoolingFeatureException("leader " + leader + " has no bucket assignments");
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java
index e8a4671d..215cdaec 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java
@@ -20,11 +20,11 @@
package org.onap.policy.drools.pooling.message;
-import org.onap.policy.drools.pooling.PoolingFeatureException;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
/**
* Messages sent on the internal topic.
@@ -53,13 +53,14 @@ public class Message {
private String channel;
/**
- *
+ * Constructor.
*/
public Message() {
super();
}
/**
+ * Constructor.
*
* @param source host on which the message originated
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
index 9fded815..4a0b8658 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
@@ -20,8 +20,8 @@
package org.onap.policy.drools.pooling.message;
-import org.onap.policy.drools.pooling.PoolingFeatureException;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.onap.policy.drools.pooling.PoolingFeatureException;
/**
* A Message that includes bucket assignments.
@@ -34,16 +34,17 @@ public class MessageWithAssignments extends Message {
private BucketAssignments assignments;
/**
- *
+ * Constructor.
*/
public MessageWithAssignments() {
super();
}
/**
+ * Constructor.
*
* @param source host on which the message originated
- * @param assignments
+ * @param assignments assignements
*/
public MessageWithAssignments(String source, BucketAssignments assignments) {
super(source);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java
index 297671ac..487c4f3e 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java
@@ -27,7 +27,7 @@ package org.onap.policy.drools.pooling.message;
public class Offline extends Message {
/**
- *
+ * Constructor.
*/
public Offline() {
super();
@@ -35,6 +35,7 @@ public class Offline extends Message {
}
/**
+ * Constructor.
*
* @param source host on which the message originated
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java
index c995a288..8c9898ee 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java
@@ -26,7 +26,7 @@ package org.onap.policy.drools.pooling.message;
public class Query extends Message {
/**
- *
+ * Constructor.
*/
public Query() {
super();
@@ -34,6 +34,7 @@ public class Query extends Message {
}
/**
+ * Constructor.
*
* @param source host on which the message originated
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
index 8f0a902a..58205ddd 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
@@ -66,8 +66,9 @@ public class ActiveState extends ProcessingState {
private boolean predHeartbeatSeen = false;
/**
+ * Constructor.
*
- * @param mgr
+ * @param mgr pooling manager
*/
public ActiveState(PoolingManager mgr) {
super(mgr, mgr.getAssignments().getLeader());
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
index a2da0ea2..069ca656 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
@@ -44,7 +44,7 @@ public class FilterUtils {
protected static final String CLASS_EQUALS = "Equals";
/**
- *
+ * Constructor.
*/
private FilterUtils() {
super();
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
index f717aa52..579dc16d 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -35,8 +35,9 @@ public class InactiveState extends State {
private static final Logger logger = LoggerFactory.getLogger(InactiveState.class);
/**
+ * Constructor.
*
- * @param mgr
+ * @param mgr pooling manager
*/
public InactiveState(PoolingManager mgr) {
super(mgr);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
index e9dc0324..7fed6a15 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
@@ -48,8 +48,9 @@ public class ProcessingState extends State {
private String leader;
/**
+ * Constructor.
*
- * @param mgr
+ * @param mgr pooling manager
* @param leader current known leader, which need not be the same as the assignment
* leader. Never {@code null}
* @throws IllegalArgumentException if an argument is invalid
@@ -308,10 +309,10 @@ public class ProcessingState extends State {
}
// move the bucket from the larger to the smaller
- Integer b = larger.remove();
- smaller.add(b);
+ Integer bucket = larger.remove();
+ smaller.add(bucket);
- bucket2host[b] = smaller.host;
+ bucket2host[bucket] = smaller.host;
// put the items back, with their new counts
assignments.add(larger);
@@ -349,8 +350,9 @@ public class ProcessingState extends State {
private Queue<Integer> buckets = new LinkedList<>();
/**
+ * Constructor.
*
- * @param host
+ * @param host host
*/
public HostBucket(String host) {
this.host = host;
@@ -375,6 +377,7 @@ public class ProcessingState extends State {
}
/**
+ * Size.
*
* @return the number of buckets assigned to this host
*/
@@ -388,11 +391,11 @@ public class ProcessingState extends State {
*/
@Override
public final int compareTo(HostBucket other) {
- int d = buckets.size() - other.buckets.size();
- if (d == 0) {
- d = host.compareTo(other.host);
+ int diff = buckets.size() - other.buckets.size();
+ if (diff == 0) {
+ diff = host.compareTo(other.host);
}
- return d;
+ return diff;
}
@Override
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
index 1a4da150..ea74f03a 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
@@ -51,8 +51,9 @@ public class QueryState extends ProcessingState {
private boolean sawSelfIdent = false;
/**
+ * Constructor.
*
- * @param mgr
+ * @param mgr manager
*/
public QueryState(PoolingManager mgr) {
// this host is the leader, until a better candidate identifies itself
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
index 3068cfc9..59d264ec 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -25,6 +25,7 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP;
import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
+
import java.util.Map;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.Heartbeat;
@@ -47,14 +48,16 @@ public class StartState extends State {
private long hbTimestampMs = System.currentTimeMillis();
/**
+ * Constructor.
*
- * @param mgr
+ * @param mgr pooling manager
*/
public StartState(PoolingManager mgr) {
super(mgr);
}
/**
+ * Get Heart beat time stamp in milliseconds.
*
* @return the time stamp inserted into the heart beat message
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index a1be2a7c..edffb3fe 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling.state;
import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
+
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -42,8 +43,8 @@ import org.slf4j.LoggerFactory;
/**
* A state in the finite state machine.
- * <p>
- * A state may have several timers associated with it, which must be cancelled whenever
+ *
+ * <p>A state may have several timers associated with it, which must be cancelled whenever
* the state is changed. Assumes that timers are not continuously added to the same state.
*/
public abstract class State {
@@ -61,8 +62,9 @@ public abstract class State {
private final List<CancellableScheduledTask> timers = new LinkedList<>();
/**
+ * Constructor.
*
- * @param mgr
+ * @param mgr pooling manager
*/
public State(PoolingManager mgr) {
this.mgr = mgr;
@@ -197,6 +199,28 @@ public abstract class State {
}
/**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Offline msg) {
+ logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
+ * Processes a message. The default method just returns {@code null}.
+ *
+ * @param msg message to be processed
+ * @return the new state, or {@code null} if the state is unchanged
+ */
+ public State process(Query msg) {
+ logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
+ return null;
+ }
+
+ /**
* Determines if a message is valid and did not originate from this host.
*
* @param msg message to be validated
@@ -227,28 +251,6 @@ public abstract class State {
}
/**
- * Processes a message. The default method just returns {@code null}.
- *
- * @param msg message to be processed
- * @return the new state, or {@code null} if the state is unchanged
- */
- public State process(Offline msg) {
- logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
- return null;
- }
-
- /**
- * Processes a message. The default method just returns {@code null}.
- *
- * @param msg message to be processed
- * @return the new state, or {@code null} if the state is unchanged
- */
- public State process(Query msg) {
- logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
- return null;
- }
-
- /**
* Publishes a message.
*
* @param msg message to be published
@@ -287,7 +289,7 @@ public abstract class State {
/**
* Publishes a message on the specified channel.
*
- * @param channel
+ * @param channel channel
* @param msg message to be published
*/
protected final void publish(String channel, Forward msg) {
@@ -297,7 +299,7 @@ public abstract class State {
/**
* Publishes a message on the specified channel.
*
- * @param channel
+ * @param channel channel
* @param msg message to be published
*/
protected final void publish(String channel, Heartbeat msg) {
@@ -307,7 +309,7 @@ public abstract class State {
/**
* Starts distributing messages using the specified bucket assignments.
*
- * @param assignments
+ * @param assignments assignments
*/
protected final void startDistributing(BucketAssignments assignments) {
if (assignments != null) {
@@ -318,8 +320,8 @@ public abstract class State {
/**
* Schedules a timer to fire after a delay.
*
- * @param delayMs
- * @param task
+ * @param delayMs delay in ms
+ * @param task task
*/
protected final void schedule(long delayMs, StateTimerTask task) {
timers.add(mgr.schedule(delayMs, task));
@@ -328,9 +330,9 @@ public abstract class State {
/**
* Schedules a timer to fire repeatedly.
*
- * @param initialDelayMs
- * @param delayMs
- * @param task
+ * @param initialDelayMs initial delay ms
+ * @param delayMs delay ms
+ * @param task task
*/
protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));