diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main')
26 files changed, 175 insertions, 136 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index 278e7fdc..27b81504 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -259,6 +259,8 @@ public class DmaapManager { public static class Factory { /** + * Get topic source. + * * @return the topic sources */ public List<TopicSource> getTopicSources() { @@ -266,6 +268,8 @@ public class DmaapManager { } /** + * Get topic sinks. + * * @return the topic sinks */ public List<TopicSink> getTopicSinks() { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index ad6a1c56..d45bded7 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -46,7 +46,9 @@ import org.slf4j.LoggerFactory; /** * Controller/session pooling. Multiple hosts may be launched, all servicing the same * controllers/sessions. When this feature is enabled, the requests are divided across the different - * hosts, instead of all running on a single, active host. <p> With each controller, there is an + * hosts, instead of all running on a single, active host. + * + * <p>With each controller, there is an * associated DMaaP topic that is used for internal communication between the different hosts * serving the controller. */ @@ -87,7 +89,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>(); /** - * + * Constructor. */ public PoolingFeature() { super(); @@ -113,6 +115,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF } /** + * Get active latch. + * * @return a latch that will be decremented when a manager enters the active state */ protected CountDownLatch getActiveLatch() { @@ -126,7 +130,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF @Override public boolean beforeStart(PolicyEngine engine) { - logger.info("initializing " + PoolingProperties.FEATURE_NAME); + logger.info("initializing {}", PoolingProperties.FEATURE_NAME); featProps = factory.getProperties(PoolingProperties.FEATURE_NAME); // remove any generic pooling topic - always use controller-specific property @@ -138,6 +142,14 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF return false; } + @Override + public boolean beforeStart(PolicyController controller) { + return doManager(controller, mgr -> { + mgr.beforeStart(); + return false; + }); + } + /** * Adds the controller and a new pooling manager to {@link #ctlr2pool}. * @@ -177,14 +189,6 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF } @Override - public boolean beforeStart(PolicyController controller) { - return doManager(controller, mgr -> { - mgr.beforeStart(); - return false; - }); - } - - @Override public boolean afterStart(PolicyController controller) { return doManager(controller, mgr -> { mgr.afterStart(); @@ -307,7 +311,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF * Executes a function using the manager associated with the controller. Catches any exceptions * from the function and re-throws it as a runtime exception. * - * @param controller + * @param controller controller * @param func function to be executed * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs @@ -329,7 +333,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Deletes the manager associated with a controller. * - * @param controller + * @param controller controller * @throws PoolingFeatureRtException if an error occurs */ private void deleteManager(PolicyController controller) { @@ -347,10 +351,11 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF private static interface MgrFunc { /** + * Apply. * - * @param mgr + * @param mgr manager * @return {@code true} if the request was handled by the manager, {@code false} otherwise - * @throws PoolingFeatureException + * @throws PoolingFeatureException feature exception */ public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException; } @@ -376,9 +381,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF private String event; /** + * Constructor. * - * @param protocol - * @param topic + * @param protocol protocol + * @param topic topic * @param event the actual event data received on the topic */ public OfferArgs(CommInfrastructure protocol, String topic, String event) { @@ -394,6 +400,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF public static class Factory { /** + * Get properties. + * * @param featName feature name * @return the properties for the specified feature */ @@ -405,7 +413,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF * Makes a pooling manager for a controller. * * @param host name/uuid of this host - * @param controller + * @param controller controller * @param props properties to use to configure the manager * @param activeLatch decremented when the manager goes Active * @return a new pooling manager @@ -418,7 +426,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Gets the policy controller associated with a drools controller. * - * @param droolsController + * @param droolsController drools controller * @return the policy controller associated with a drools controller */ public PolicyController getController(DroolsController droolsController) { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java index c25dc12d..94956d6b 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java @@ -86,7 +86,7 @@ public interface PoolingManager { /** * Handles a {@link Forward} event that was received from the internal topic. * - * @param event + * @param event event */ public void handle(Forward event); @@ -94,7 +94,7 @@ public interface PoolingManager { * Schedules a timer to fire after a delay. * * @param delayMs delay, in milliseconds - * @param task + * @param task task * @return a new scheduled task */ public CancellableScheduledTask schedule(long delayMs, StateTimerTask task); @@ -104,7 +104,7 @@ public interface PoolingManager { * * @param initialDelayMs initial delay, in milliseconds * @param delayMs delay, in milliseconds - * @param task + * @param task task * @return a new scheduled task */ public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index 02ba4ec9..17d520ad 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -20,6 +20,7 @@ package org.onap.policy.drools.pooling; +import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -29,10 +30,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.onap.policy.common.utils.properties.SpecProperties; -import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.utils.properties.SpecProperties; +import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.pooling.extractor.ClassExtractors; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; @@ -50,7 +51,6 @@ import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.system.PolicyController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; /** * Implementation of a {@link PoolingManager}. Until bucket assignments have been made, @@ -126,8 +126,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Current state. - * <p> - * This uses a finite state machine, wherein the state object contains all of the data + * + * <p>This uses a finite state machine, wherein the state object contains all of the data * relevant to that state. Each state object has a process() method, specific to each * type of {@link Message} subclass. The method returns the next state object, or * {@code null} if the state is to remain the same. @@ -417,10 +417,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Handles an event from the internal topic. * - * @param topic2 - * @param event - * @return {@code true} if the event was handled, {@code false} if the controller - * should handle it + * @param commType comm infrastructure + * @param topic2 topic + * @param event event */ @Override public void onTopicEvent(CommInfrastructure commType, String topic2, String event) { @@ -440,14 +439,14 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Called by the PolicyController before it offers the event to the DroolsController. * If the controller is locked, then it isn't processing events. However, they still * need to be forwarded, thus in that case, they are decoded and forwarded. - * <p> - * On the other hand, if the controller is not locked, then we just return immediately + * + * <p>On the other hand, if the controller is not locked, then we just return immediately * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle * it instead, as it already has the decoded message. * - * @param protocol - * @param topic2 - * @param event + * @param protocol protocol + * @param topic2 topic + * @param event event * @return {@code true} if the event was handled by the manager, {@code false} if it * must still be handled by the invoker */ @@ -464,8 +463,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Called by the DroolsController before it inserts the event into the rule engine. * - * @param protocol - * @param topic2 + * @param protocol protocol + * @param topic2 topic * @param event original event text, as received from the Bus * @param event2 event, as an object * @return {@code true} if the event was handled by the manager, {@code false} if it @@ -484,9 +483,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Handles an event from an external topic. * - * @param protocol - * @param topic2 - * @param event + * @param protocol protocol + * @param topic2 topic + * @param event event * @param reqid request id extracted from the event, or {@code null} if it couldn't be * extracted * @return {@code true} if the event was handled by the manager, {@code false} if it @@ -519,7 +518,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Handles an event from an external topic. * - * @param event + * @param event event * @return {@code true} if the event was handled, {@code false} if the invoker should * handle it */ @@ -539,7 +538,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Handles a {@link Forward} event, possibly forwarding it again. * - * @param event + * @param event event * @return {@code true} if the event was handled, {@code false} if the invoker should * handle it */ @@ -595,8 +594,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Decodes an event from a String into an event Object. * - * @param topic2 - * @param event + * @param topic2 topic + * @param event event * @return the decoded event object, or {@code null} if it can't be decoded */ private Object decodeEvent(String topic2, String event) { @@ -625,10 +624,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Makes a {@link Forward}, and validates its contents. * - * @param protocol - * @param topic2 - * @param event - * @param reqid + * @param protocol protocol + * @param topic2 topic + * @param event event + * @param reqid request id * @return a new message, or {@code null} if the message was invalid */ private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) { @@ -661,7 +660,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Injects an event into the controller. * - * @param event + * @param event event */ private void inject(Forward event) { logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic()); @@ -760,6 +759,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private StateTimerTask task; /** + * Constructor. * * @param task task to execute when this timer runs */ @@ -832,9 +832,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param topic topic on which the event was received * @param event event text to be decoded * @return the decoded event - * @throws IllegalArgumentException - * @throw UnsupportedOperationException - * @throws IllegalStateException + * @throws IllegalArgumentException illegal argument + * @throw UnsupportedOperationException unsupported operation + * @throws IllegalStateException illegal state */ public Object decodeEvent(DroolsController drools, String topic, String event) { return EventProtocolCoder.manager.decode(drools.getGroupId(), drools.getArtifactId(), topic, event); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java index 1482366f..795bd29d 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java @@ -129,6 +129,8 @@ public class PoolingProperties extends PropertyConfiguration { private long interHeartbeatMs; /** + * Constructor. + * * @param controllerName the name of the controller * @param props set of properties used to configure this * @throws PropertyException if an error occurs diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java index 63aefb7a..b37c33b0 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java @@ -20,11 +20,11 @@ package org.onap.policy.drools.pooling; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; import org.onap.policy.drools.pooling.message.Message; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; /** * Serialization helper functions. @@ -38,7 +38,7 @@ public class Serializer { private final ObjectMapper mapper = new ObjectMapper(); /** - * + * Constructor. */ public Serializer() { super(); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java index 97e96337..a4d1fb1b 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java @@ -70,6 +70,7 @@ public class ClassExtractors { private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>(); /** + * Constructor. * * @param props properties that specify how the data is to be extracted from * a given class @@ -267,8 +268,8 @@ public class ClassExtractors { * hierarchically, where each name identifies a particular component within * the hierarchy. Supports retrieval from {@link Map} objects, as well as * via getXxx() methods, or by direct field retrieval. - * <p> - * Note: this will <i>not</i> work if POJOs are contained within a Map. + * + * <p>Note: this will <i>not</i> work if POJOs are contained within a Map. */ private class ComponetizedExtractor implements Extractor { @@ -278,11 +279,12 @@ public class ClassExtractors { private final Extractor[] extractors; /** + * Constructor. * * @param clazz the class associated with the object at the root of the * hierarchy * @param names name associated with each component - * @throws ExtractorException + * @throws ExtractorException extractor exception */ public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException { this.extractors = new Extractor[names.length]; @@ -307,15 +309,12 @@ public class ClassExtractors { * @param comp name of the component to extract * @return a pair containing the extractor and the extracted object's * type - * @throws ExtractorException + * @throws ExtractorException extrator exception */ private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException { - Pair<Extractor, Class<?>> pair = null; - - if (pair == null) { - pair = getMethodExtractor(clazz, comp); - } - + + Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp); + if (pair == null) { pair = getFieldExtractor(clazz, comp); } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java index d394795d..9389ab22 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java @@ -37,6 +37,7 @@ public class FieldExtractor implements Extractor { private final Field field; /** + * Constructor. * * @param field field containing the object */ diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java index 032ea47e..9c5be5ff 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java @@ -37,6 +37,7 @@ public class MapExtractor implements Extractor { private final String key; /** + * Constructor. * * @param key key to the item to extract from the map */ diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java index 20c4a1a7..3efef5ec 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java @@ -38,6 +38,7 @@ public class MethodExtractor implements Extractor { private final Method method; /** + * Constructor. * * @param method method to invoke to extract the contained object */ diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java index ee871cbd..b5b64693 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java @@ -20,13 +20,13 @@ package org.onap.policy.drools.pooling.message; +import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.Arrays; import java.util.HashSet; import java.util.Set; import org.onap.policy.drools.pooling.PoolingFeatureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.annotation.JsonIgnore; /** * Bucket assignments, which is simply an array of host names. @@ -57,13 +57,14 @@ public class BucketAssignments { private String[] hostArray = null; /** - * + * Constructor. */ public BucketAssignments() { super(); } /** + * Constructor. * * @param hostArray maps a bucket number (i.e., array index) to a host. All values * must be non-null @@ -204,12 +205,15 @@ public class BucketAssignments { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } BucketAssignments other = (BucketAssignments) obj; return Arrays.equals(hostArray, other.hostArray); } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java index fb3d4eb2..d4037e90 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java @@ -20,9 +20,9 @@ package org.onap.policy.drools.pooling.message; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.pooling.PoolingFeatureException; -import com.fasterxml.jackson.annotation.JsonIgnore; /** * Message to forward an event to another host. @@ -67,12 +67,13 @@ public class Forward extends Message { } /** + * Constructor. * * @param source host on which the message originated - * @param protocol - * @param topic + * @param protocol protocol + * @param topic topic * @param payload the actual event data received on the topic - * @param requestId + * @param requestId request id */ public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) { super(source); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java index 2a63a5be..50a34138 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Heartbeat.java @@ -31,7 +31,7 @@ public class Heartbeat extends Message { private long timestampMs; /** - * + * Constructor. */ public Heartbeat() { super(); @@ -39,6 +39,7 @@ public class Heartbeat extends Message { } /** + * Constructor. * * @param source host on which the message originated * @param timestampMs time, in milliseconds, associated with the message diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java index 5de6b8f9..6bb88363 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Identification.java @@ -26,7 +26,7 @@ package org.onap.policy.drools.pooling.message; public class Identification extends MessageWithAssignments { /** - * + * Constructor. */ public Identification() { super(); @@ -34,9 +34,10 @@ public class Identification extends MessageWithAssignments { } /** + * Constructor. * * @param source host on which the message originated - * @param assignments + * @param assignments assignments */ public Identification(String source, BucketAssignments assignments) { super(source, assignments); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java index 0fc48c3c..7464a531 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java @@ -20,8 +20,8 @@ package org.onap.policy.drools.pooling.message; -import org.onap.policy.drools.pooling.PoolingFeatureException; import com.fasterxml.jackson.annotation.JsonIgnore; +import org.onap.policy.drools.pooling.PoolingFeatureException; /** * Indicates that the "source" of this message is now the "lead" host. @@ -29,16 +29,17 @@ import com.fasterxml.jackson.annotation.JsonIgnore; public class Leader extends MessageWithAssignments { /** - * + * Constructor. */ public Leader() { super(); } /** + * Constructor. * * @param source host on which the message originated - * @param assignments + * @param assignments assignments */ public Leader(String source, BucketAssignments assignments) { super(source, assignments); @@ -61,7 +62,7 @@ public class Leader extends MessageWithAssignments { String leader = getSource(); - if(!assignments.hasAssignment(leader)) { + if (!assignments.hasAssignment(leader)) { throw new PoolingFeatureException("leader " + leader + " has no bucket assignments"); } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java index e8a4671d..215cdaec 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java @@ -20,11 +20,11 @@ package org.onap.policy.drools.pooling.message; -import org.onap.policy.drools.pooling.PoolingFeatureException; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.onap.policy.drools.pooling.PoolingFeatureException; /** * Messages sent on the internal topic. @@ -53,13 +53,14 @@ public class Message { private String channel; /** - * + * Constructor. */ public Message() { super(); } /** + * Constructor. * * @param source host on which the message originated */ diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java index 9fded815..4a0b8658 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java @@ -20,8 +20,8 @@ package org.onap.policy.drools.pooling.message; -import org.onap.policy.drools.pooling.PoolingFeatureException; import com.fasterxml.jackson.annotation.JsonIgnore; +import org.onap.policy.drools.pooling.PoolingFeatureException; /** * A Message that includes bucket assignments. @@ -34,16 +34,17 @@ public class MessageWithAssignments extends Message { private BucketAssignments assignments; /** - * + * Constructor. */ public MessageWithAssignments() { super(); } /** + * Constructor. * * @param source host on which the message originated - * @param assignments + * @param assignments assignements */ public MessageWithAssignments(String source, BucketAssignments assignments) { super(source); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java index 297671ac..487c4f3e 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Offline.java @@ -27,7 +27,7 @@ package org.onap.policy.drools.pooling.message; public class Offline extends Message { /** - * + * Constructor. */ public Offline() { super(); @@ -35,6 +35,7 @@ public class Offline extends Message { } /** + * Constructor. * * @param source host on which the message originated */ diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java index c995a288..8c9898ee 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Query.java @@ -26,7 +26,7 @@ package org.onap.policy.drools.pooling.message; public class Query extends Message { /** - * + * Constructor. */ public Query() { super(); @@ -34,6 +34,7 @@ public class Query extends Message { } /** + * Constructor. * * @param source host on which the message originated */ diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java index 8f0a902a..58205ddd 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java @@ -66,8 +66,9 @@ public class ActiveState extends ProcessingState { private boolean predHeartbeatSeen = false; /** + * Constructor. * - * @param mgr + * @param mgr pooling manager */ public ActiveState(PoolingManager mgr) { super(mgr, mgr.getAssignments().getLeader()); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java index a2da0ea2..069ca656 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java @@ -44,7 +44,7 @@ public class FilterUtils { protected static final String CLASS_EQUALS = "Equals"; /** - * + * Constructor. */ private FilterUtils() { super(); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java index f717aa52..579dc16d 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java @@ -35,8 +35,9 @@ public class InactiveState extends State { private static final Logger logger = LoggerFactory.getLogger(InactiveState.class); /** + * Constructor. * - * @param mgr + * @param mgr pooling manager */ public InactiveState(PoolingManager mgr) { super(mgr); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java index e9dc0324..7fed6a15 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java @@ -48,8 +48,9 @@ public class ProcessingState extends State { private String leader; /** + * Constructor. * - * @param mgr + * @param mgr pooling manager * @param leader current known leader, which need not be the same as the assignment * leader. Never {@code null} * @throws IllegalArgumentException if an argument is invalid @@ -308,10 +309,10 @@ public class ProcessingState extends State { } // move the bucket from the larger to the smaller - Integer b = larger.remove(); - smaller.add(b); + Integer bucket = larger.remove(); + smaller.add(bucket); - bucket2host[b] = smaller.host; + bucket2host[bucket] = smaller.host; // put the items back, with their new counts assignments.add(larger); @@ -349,8 +350,9 @@ public class ProcessingState extends State { private Queue<Integer> buckets = new LinkedList<>(); /** + * Constructor. * - * @param host + * @param host host */ public HostBucket(String host) { this.host = host; @@ -375,6 +377,7 @@ public class ProcessingState extends State { } /** + * Size. * * @return the number of buckets assigned to this host */ @@ -388,11 +391,11 @@ public class ProcessingState extends State { */ @Override public final int compareTo(HostBucket other) { - int d = buckets.size() - other.buckets.size(); - if (d == 0) { - d = host.compareTo(other.host); + int diff = buckets.size() - other.buckets.size(); + if (diff == 0) { + diff = host.compareTo(other.host); } - return d; + return diff; } @Override diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java index 1a4da150..ea74f03a 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java @@ -51,8 +51,9 @@ public class QueryState extends ProcessingState { private boolean sawSelfIdent = false; /** + * Constructor. * - * @param mgr + * @param mgr manager */ public QueryState(PoolingManager mgr) { // this host is the leader, until a better candidate identifies itself diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java index 3068cfc9..59d264ec 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java @@ -25,6 +25,7 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP; import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd; import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals; import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; + import java.util.Map; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.Heartbeat; @@ -47,14 +48,16 @@ public class StartState extends State { private long hbTimestampMs = System.currentTimeMillis(); /** + * Constructor. * - * @param mgr + * @param mgr pooling manager */ public StartState(PoolingManager mgr) { super(mgr); } /** + * Get Heart beat time stamp in milliseconds. * * @return the time stamp inserted into the heart beat message */ diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index a1be2a7c..edffb3fe 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling.state; import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL; import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals; import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; + import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -42,8 +43,8 @@ import org.slf4j.LoggerFactory; /** * A state in the finite state machine. - * <p> - * A state may have several timers associated with it, which must be cancelled whenever + * + * <p>A state may have several timers associated with it, which must be cancelled whenever * the state is changed. Assumes that timers are not continuously added to the same state. */ public abstract class State { @@ -61,8 +62,9 @@ public abstract class State { private final List<CancellableScheduledTask> timers = new LinkedList<>(); /** + * Constructor. * - * @param mgr + * @param mgr pooling manager */ public State(PoolingManager mgr) { this.mgr = mgr; @@ -197,6 +199,28 @@ public abstract class State { } /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Offline msg) { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** + * Processes a message. The default method just returns {@code null}. + * + * @param msg message to be processed + * @return the new state, or {@code null} if the state is unchanged + */ + public State process(Query msg) { + logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic()); + return null; + } + + /** * Determines if a message is valid and did not originate from this host. * * @param msg message to be validated @@ -227,28 +251,6 @@ public abstract class State { } /** - * Processes a message. The default method just returns {@code null}. - * - * @param msg message to be processed - * @return the new state, or {@code null} if the state is unchanged - */ - public State process(Offline msg) { - logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); - return null; - } - - /** - * Processes a message. The default method just returns {@code null}. - * - * @param msg message to be processed - * @return the new state, or {@code null} if the state is unchanged - */ - public State process(Query msg) { - logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic()); - return null; - } - - /** * Publishes a message. * * @param msg message to be published @@ -287,7 +289,7 @@ public abstract class State { /** * Publishes a message on the specified channel. * - * @param channel + * @param channel channel * @param msg message to be published */ protected final void publish(String channel, Forward msg) { @@ -297,7 +299,7 @@ public abstract class State { /** * Publishes a message on the specified channel. * - * @param channel + * @param channel channel * @param msg message to be published */ protected final void publish(String channel, Heartbeat msg) { @@ -307,7 +309,7 @@ public abstract class State { /** * Starts distributing messages using the specified bucket assignments. * - * @param assignments + * @param assignments assignments */ protected final void startDistributing(BucketAssignments assignments) { if (assignments != null) { @@ -318,8 +320,8 @@ public abstract class State { /** * Schedules a timer to fire after a delay. * - * @param delayMs - * @param task + * @param delayMs delay in ms + * @param task task */ protected final void schedule(long delayMs, StateTimerTask task) { timers.add(mgr.schedule(delayMs, task)); @@ -328,9 +330,9 @@ public abstract class State { /** * Schedules a timer to fire repeatedly. * - * @param initialDelayMs - * @param delayMs - * @param task + * @param initialDelayMs initial delay ms + * @param delayMs delay ms + * @param task task */ protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task)); |