diff options
author | Jim Hahn <jrh3@att.com> | 2018-04-05 13:27:38 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2018-04-16 14:57:04 -0400 |
commit | a69e3a9565186ae06c96215f9c73198d0651b529 (patch) | |
tree | 03c33749e20766e97321ae9e326d644381653234 /feature-pooling-dmaap/src/main/java/org/onap | |
parent | 155e6205083493851894623b855cd0ec99bbe7c2 (diff) |
Sonar fixes to pooling
Made various changes to the pooling feature to address some of the
sonar issues.
Remove duplicate classes, Pair & Triple.
Fix sonar issue about duplicate process(xxx) methods.
Remove extra items from pom.xml and add assembly builder.
Fix license text in pom.xml, inadvertently reformatted.
Fix a few typos in comments and change LinkedList to Queue.
Move assembly.xml to correct source directory.
Replace ScheduledFuture<?> with CancellableScheduledTask, to satisfy
sonar.
Eliminate "TODO" items: add logging, delay after sending Offline
message.
Add more logging in process(Message) methods.
Begin creating end-to-end junit test, fixed bugs found as a result.
Restore logback-test.xml to WARN.
Fix merge conflict - restored test properties file.
Change-Id: Ic70a8cee49678ea0fc3da309699aec1f6088fe70
Issue-ID: POLICY-728
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap')
22 files changed, 702 insertions, 692 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java new file mode 100644 index 00000000..b16f44ad --- /dev/null +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +/** + * A scheduled task that can be cancelled. + */ +@FunctionalInterface +public interface CancellableScheduledTask { + + /** + * Cancels the scheduled task. + */ + public void cancel(); +} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index 98543f29..102eda75 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -58,14 +58,14 @@ public class DmaapManager { private final TopicSink topicSink; /** - * Topic sources. In theory, there's only one item in this list, the - * internal DMaaP topic. + * Topic sources. In theory, there's only one item in this list, the internal DMaaP + * topic. */ private final List<TopicSource> sources; /** - * Topic sinks. In theory, there's only one item in this list, the internal - * DMaaP topic. + * Topic sinks. In theory, there's only one item in this list, the internal DMaaP + * topic. */ private final List<TopicSink> sinks; @@ -112,8 +112,8 @@ public class DmaapManager { } /** - * Used by junit tests to set the factory used to create various objects - * used by this class. + * Used by junit tests to set the factory used to create various objects used by this + * class. * * @param factory the new factory */ @@ -129,8 +129,7 @@ public class DmaapManager { * Finds the topic source associated with the internal DMaaP topic. * * @return the topic source - * @throws PoolingFeatureException if the source doesn't exist or is not - * filterable + * @throws PoolingFeatureException if the source doesn't exist or is not filterable */ private FilterableTopicSource findTopicSource() throws PoolingFeatureException { for (TopicSource src : sources) { @@ -174,6 +173,7 @@ public class DmaapManager { } try { + logger.info("start publishing to topic {}", topic); topicSink.start(); publishing = true; @@ -184,13 +184,28 @@ public class DmaapManager { /** * Stops the publisher. + * + * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued + * messages and close */ - public void stopPublisher() { + public void stopPublisher(long waitMs) { if (!publishing) { return; } + /* + * Give the sink a chance to transmit messages in the queue. It would be better if + * "waitMs" could be passed to sink.stop(), but that isn't an option at this time. + */ + try { + Thread.sleep(waitMs); + + } catch (InterruptedException e) { + logger.warn("message transmission stopped due to {}", e.getMessage()); + } + try { + logger.info("stop publishing to topic {}", topic); publishing = false; topicSink.stop(); @@ -209,6 +224,7 @@ public class DmaapManager { return; } + logger.info("start consuming from topic {}", topic); topicSource.register(listener); consuming = true; } @@ -223,6 +239,7 @@ public class DmaapManager { return; } + logger.info("stop consuming from topic {}", topic); consuming = false; topicSource.unregister(listener); } @@ -230,16 +247,16 @@ public class DmaapManager { /** * Sets the server-side filter to be used by the consumer. * - * @param filter the filter string, or {@code null} if no filter is to be - * used + * @param filter the filter string, or {@code null} if no filter is to be used * @throws PoolingFeatureException if the topic is not filterable */ public void setFilter(String filter) throws PoolingFeatureException { try { + logger.debug("change filter for topic {} to {}", topic, filter); topicSource.setFilter(filter); } catch (UnsupportedOperationException e) { - throw new PoolingFeatureException("cannot filter topic " + topic); + throw new PoolingFeatureException("cannot filter topic " + topic, e); } } @@ -247,8 +264,7 @@ public class DmaapManager { * Publishes a message to the sink. * * @param msg message to be published - * @throws PoolingFeatureException if an error occurs or the publisher isn't - * running + * @throws PoolingFeatureException if an error occurs or the publisher isn't running */ public void publish(String msg) throws PoolingFeatureException { if (!publishing) { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java deleted file mode 100644 index d2f32043..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling; - -// TODO move to policy-utils - -import java.lang.annotation.Annotation; -import java.lang.reflect.Field; -import java.util.Properties; -import org.onap.policy.common.utils.properties.SpecPropertyConfiguration; -import org.onap.policy.common.utils.properties.exception.PropertyException; - -/** - * Checks whether or not a feature is enabled. The name of the "enable" property - * is assumed to be of the form accepted by a {@link SpecPropertyConfiguration}, - * which contains a substitution place-holder into which a "specializer" (e.g., - * controller or session name) is substituted. - */ -public class FeatureEnabledChecker { - - /** - * - */ - private FeatureEnabledChecker() { - super(); - } - - /** - * Determines if a feature is enabled for a particular specializer. - * - * @param props properties from which to extract the "enabled" flag - * @param specializer specializer to be substituted into the property name - * when extracting - * @param propName the name of the "enabled" property - * @return {@code true} if the feature is enabled, or {@code false} if it is - * not enabled (or if the property doesn't exist) - * @throws IllegalArgumentException if the "enabled" property is not a - * boolean value - */ - public static boolean isFeatureEnabled(Properties props, String specializer, String propName) { - - try { - return new Config(specializer, props, propName).isEnabled(); - - } catch (PropertyException e) { - throw new IllegalArgumentException("cannot check property " + propName, e); - } - } - - - /** - * Configuration used to extract the value. - */ - private static class Config extends SpecPropertyConfiguration { - - /** - * There is a bit of trickery here. This annotation is just a - * place-holder to get the superclass to invoke the - * {@link #setValue(java.lang.reflect.Field, Properties, Property) - * setValue()} method. When that's invoked, we'll substitute - * {@link #propOverride} instead of this annotation. - */ - @Property(name = "feature-enabled-property-place-holder") - private boolean enabled; - - /** - * Annotation that will actually be used to set the field. - */ - private Property propOverride; - - /** - * - * @param specializer specializer to be substituted into the property - * name when extracting - * @param props properties from which to extract the "enabled" flag - * @param propName the name of the "enabled" property - * @throws PropertyException if an error occurs - */ - public Config(String specializer, Properties props, String propName) throws PropertyException { - super(specializer); - - propOverride = new Property() { - - @Override - public String name() { - return propName; - } - - @Override - public String defaultValue() { - // feature is disabled by default - return "false"; - } - - @Override - public String accept() { - return ""; - } - - @Override - public Class<? extends Annotation> annotationType() { - return Property.class; - } - }; - - setAllFields(props); - } - - /** - * Substitutes {@link #propOverride} for "prop". - */ - @Override - protected boolean setValue(Field field, Properties props, Property prop) throws PropertyException { - return super.setValue(field, props, propOverride); - } - - /** - * - * @return {@code true} if the feature is enabled, {@code false} - * otherwise - */ - public boolean isEnabled() { - return enabled; - } - }; -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index da47a031..21cbc4db 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -20,44 +20,41 @@ package org.onap.policy.drools.pooling; -import java.io.IOException; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; -import org.onap.policy.drools.core.PolicySessionFeatureAPI; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.features.DroolsControllerFeatureAPI; import org.onap.policy.drools.features.PolicyControllerFeatureAPI; +import org.onap.policy.drools.features.PolicyEngineFeatureAPI; +import org.onap.policy.drools.persistence.SystemPersistence; import org.onap.policy.drools.system.PolicyController; -import org.onap.policy.drools.utils.PropertyUtil; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.util.FeatureEnabledChecker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Controller/session pooling. Multiple hosts may be launched, all servicing the - * same controllers/sessions. When this feature is enabled, the requests are - * divided across the different hosts, instead of all running on a single, - * active host. + * Controller/session pooling. Multiple hosts may be launched, all servicing the same + * controllers/sessions. When this feature is enabled, the requests are divided across the + * different hosts, instead of all running on a single, active host. * <p> - * With each controller, there is an associated DMaaP topic that is used for - * internal communication between the different hosts serving the controller. + * With each controller, there is an associated DMaaP topic that is used for internal + * communication between the different hosts serving the controller. */ -public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControllerFeatureAPI, PolicySessionFeatureAPI { +public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerFeatureAPI, DroolsControllerFeatureAPI { private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class); - // TODO state-management doesn't allow more than one active host at a time - /** * Factory used to create objects. */ private static Factory factory; /** - * Entire set of feature properties, including those specific to various - * controllers. + * Entire set of feature properties, including those specific to various controllers. */ private Properties featProps = null; @@ -67,9 +64,9 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107); /** - * Arguments passed to beforeOffer(), which are saved for when the - * beforeInsert() is called later. As multiple threads can be active within - * the methods at the same time, we must keep this in thread local storage. + * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is + * called later. As multiple threads can be active within the methods at the same + * time, we must keep this in thread local storage. */ private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>(); @@ -98,20 +95,11 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl return 0; } - /** - * @throws PoolingFeatureRtException if the properties cannot be read or are - * invalid - */ @Override - public void globalInit(String[] args, String configDir) { - logger.info("initializing pooling feature"); - - try { - featProps = PropertyUtil.getProperties(configDir + "/feature-pooling-dmaap.properties"); - - } catch (IOException ex) { - throw new PoolingFeatureRtException(ex); - } + public boolean beforeStart(PolicyEngine engine) { + logger.info("initializing " + PoolingProperties.FEATURE_NAME); + featProps = factory.getProperties(PoolingProperties.FEATURE_NAME); + return false; } /** @@ -205,8 +193,8 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl @Override public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) { /* - * As this is invoked a lot, we'll directly call the manager's method - * instead of using the functional interface via doManager(). + * As this is invoked a lot, we'll directly call the manager's method instead of + * using the functional interface via doManager(). */ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { @@ -226,6 +214,7 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl OfferArgs args = offerArgs.get(); if (args == null) { + logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert"); return false; } @@ -234,16 +223,21 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl controller = factory.getController(droolsController); } catch (IllegalArgumentException | IllegalStateException e) { + logger.warn("cannot get controller for {} {}", droolsController.getGroupId(), + droolsController.getArtifactId(), e); return false; } + if (controller == null) { + logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(), + droolsController.getArtifactId()); return false; } /* - * As this is invoked a lot, we'll directly call the manager's method - * instead of using the functional interface via doManager(). + * As this is invoked a lot, we'll directly call the manager's method instead of + * using the functional interface via doManager(). */ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { @@ -264,14 +258,12 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl } /** - * Executes a function using the manager associated with the controller. - * Catches any exceptions from the function and re-throws it as a runtime - * exception. + * Executes a function using the manager associated with the controller. Catches any + * exceptions from the function and re-throws it as a runtime exception. * * @param controller * @param func function to be executed - * @return {@code true} if the function handled the request, {@code false} - * otherwise + * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ private boolean doManager(PolicyController controller, MgrFunc func) { @@ -284,26 +276,28 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl return func.apply(mgr); } catch (PoolingFeatureException e) { - throw e.toRuntimeException(); + throw new PoolingFeatureRtException(e); } } /** - * Executes a function using the manager associated with the controller and - * then deletes the manager. Catches any exceptions from the function and - * re-throws it as a runtime exception. + * Executes a function using the manager associated with the controller and then + * deletes the manager. Catches any exceptions from the function and re-throws it as a + * runtime exception. * * @param controller * @param func function to be executed - * @return {@code true} if the function handled the request, {@code false} - * otherwise + * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) { + String name = controller.getName(); + logger.info("remove feature-pool-dmaap manager for {}", name); + // NOTE: using "remove()" instead of "get()" - PoolingManagerImpl mgr = ctlr2pool.remove(controller.getName()); + PoolingManagerImpl mgr = ctlr2pool.remove(name); if (mgr == null) { return false; @@ -321,8 +315,8 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl /** * * @param mgr - * @return {@code true} if the request was handled by the manager, - * {@code false} otherwise + * @return {@code true} if the request was handled by the manager, {@code false} + * otherwise * @throws PoolingFeatureException */ public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException; @@ -367,6 +361,14 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl public static class Factory { /** + * @param featName feature name + * @return the properties for the specified feature + */ + public Properties getProperties(String featName) { + return SystemPersistence.manager.getProperties(featName); + } + + /** * Makes a pooling manager for a controller. * * @param controller diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java index 5efd1414..c3c81879 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java @@ -47,13 +47,4 @@ public class PoolingFeatureException extends Exception { super(message, cause, enableSuppression, writableStackTrace); } - /** - * Converts the exception to a runtime exception. - * - * @return a new runtime exception, wrapping this exception - */ - public PoolingFeatureRtException toRuntimeException() { - return new PoolingFeatureRtException(this); - } - } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java index de08d1e1..5036b605 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java @@ -21,7 +21,6 @@ package org.onap.policy.drools.pooling; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledFuture; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Message; @@ -48,19 +47,19 @@ public interface PoolingManager { public String getHost(); /** - * Gets the name of the internal DMaaP topic used by this manager to - * communicate with its other hosts. + * Gets the name of the internal DMaaP topic used by this manager to communicate with + * its other hosts. * * @return the name of the internal DMaaP topic */ public String getTopic(); /** - * Indicates that communication with internal DMaaP topic failed, typically - * due to a missed heart beat. Stops the PolicyController. + * Indicates that communication with internal DMaaP topic failed, typically due to a + * missed heart beat. Stops the PolicyController. * - * @return a latch that can be used to determine when the controller's - * stop() method has completed + * @return a latch that can be used to determine when the controller's stop() method + * has completed */ public CountDownLatch internalTopicFailed(); @@ -68,14 +67,16 @@ public interface PoolingManager { * Starts distributing requests according to the given bucket assignments. * * @param assignments must <i>not</i> be {@code null} + * @return a latch that can be used to determine when the events in the event queue + * have all be processed */ - public void startDistributing(BucketAssignments assignments); + public CountDownLatch startDistributing(BucketAssignments assignments); /** * Gets the current bucket assignments. * - * @return the current bucket assignments, or {@code null} if no assignments - * have been made + * @return the current bucket assignments, or {@code null} if no assignments have been + * made */ public BucketAssignments getAssignments(); @@ -95,8 +96,7 @@ public interface PoolingManager { public void publish(String channel, Message msg); /** - * Handles a {@link Forward} event that was received from the internal - * topic. + * Handles a {@link Forward} event that was received from the internal topic. * * @param event */ @@ -107,9 +107,9 @@ public interface PoolingManager { * * @param delayMs delay, in milliseconds * @param task - * @return a future that can be used to cancel the timer + * @return a new scheduled task */ - public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task); + public CancellableScheduledTask schedule(long delayMs, StateTimerTask task); /** * Schedules a timer to fire repeatedly. @@ -117,9 +117,9 @@ public interface PoolingManager { * @param initialDelayMs initial delay, in milliseconds * @param delayMs delay, in milliseconds * @param task - * @return a future that can be used to cancel the timer + * @return a new scheduled task */ - public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task); + public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task); /** * Transitions to the "start" state. diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index cd71670d..422efdd7 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.event.comm.TopicListener; @@ -38,6 +39,7 @@ import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.state.ActiveState; import org.onap.policy.drools.pooling.state.IdleState; import org.onap.policy.drools.pooling.state.InactiveState; @@ -52,39 +54,30 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; /** - * Implementation of a {@link PoolingManager}. Until bucket assignments have - * been made, events coming from external topics are saved in a queue for later - * processing. Once assignments are made, the saved events are processed. In - * addition, while the controller is locked, events are still forwarded to other - * hosts and bucket assignments are still updated, based on any {@link Leader} - * messages that it receives. + * Implementation of a {@link PoolingManager}. Until bucket assignments have been made, + * events coming from external topics are saved in a queue for later processing. Once + * assignments are made, the saved events are processed. In addition, while the controller + * is locked, events are still forwarded to other hosts and bucket assignments are still + * updated, based on any {@link Leader} messages that it receives. */ public class PoolingManagerImpl implements PoolingManager, TopicListener { private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class); - // TODO metrics, audit logging - /** * Maximum number of times a message can be forwarded. */ public static final int MAX_HOPS = 5; /** - * Type of item that the extractors will be extracting. - */ - private static final String EXTRACTOR_TYPE = "requestId"; - - /** - * Prefix for extractor properties. + * Factory used to create various objects. Can be overridden during junit testing. */ - private static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE; + private static Factory factory = new Factory(); /** - * Factory used to create various objects. Can be overridden during junit - * testing. + * ID of the last host that was created. */ - private static Factory factory = new Factory(); + private static final AtomicReference<String> lastHost = new AtomicReference<>(null); /** * ID of this host. @@ -102,14 +95,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final PolicyController controller; /** - * Where to offer events that have been forwarded to this host (i.e, the - * controller). + * Where to offer events that have been forwarded to this host (i.e, the controller). */ private final TopicListener listener; /** - * Used to encode & decode request objects received from & sent to a rule - * engine. + * Used to encode & decode request objects received from & sent to a rule engine. */ private final Serializer serializer; @@ -129,19 +120,18 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final ClassExtractors extractors; /** - * Lock used while updating {@link #current}. In general, public methods - * must use this, while private methods assume the lock is already held. + * Lock used while updating {@link #current}. In general, public methods must use + * this, while private methods assume the lock is already held. */ private final Object curLocker = new Object(); /** * Current state. * <p> - * This uses a finite state machine, wherein the state object contains all - * of the data relevant to that state. Each state object has a process() - * method, specific to each type of {@link Message} subclass. The method - * returns the next state object, or {@code null} if the state is to remain - * the same. + * This uses a finite state machine, wherein the state object contains all of the data + * relevant to that state. Each state object has a process() method, specific to each + * type of {@link Message} subclass. The method returns the next state object, or + * {@code null} if the state is to remain the same. */ private State current; @@ -177,13 +167,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { this.controller = controller; this.props = props; + lastHost.set(this.host); + try { this.listener = (TopicListener) controller; this.serializer = new Serializer(); this.topic = props.getPoolingTopic(); this.eventq = factory.makeEventQueue(props); - SpecProperties spec = new SpecProperties(PROP_EXTRACTOR_PREFIX, controller.getName()); + SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), + props.getSource()); this.extractors = factory.makeClassExtractors(spec); this.dmaapMgr = factory.makeDmaapManager(props); @@ -197,7 +190,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } catch (PoolingFeatureException e) { logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName()); - throw e.toRuntimeException(); + throw new PoolingFeatureRtException(e); } } @@ -210,6 +203,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** + * Used by junit tests. + * + * @return the ID of the last host that was created, or {@code null} if no hosts have + * been created yet + */ + protected static String getLastHost() { + return lastHost.get(); + } + + /** * Should only be used by junit tests. * * @return the current state @@ -234,22 +237,22 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Indicates that the controller is about to start. Starts the publisher for - * the internal topic, and creates a thread pool for the timers. + * Indicates that the controller is about to start. Starts the publisher for the + * internal topic, and creates a thread pool for the timers. * - * @throws PoolingFeatureException if the internal topic publisher cannot be - * started + * @throws PoolingFeatureException if the internal topic publisher cannot be started */ public void beforeStart() throws PoolingFeatureException { synchronized (curLocker) { if (scheduler == null) { dmaapMgr.startPublisher(); + logger.debug("make scheduler thread for topic {}", getTopic()); scheduler = factory.makeScheduler(); /* - * Only a handful of timers at any moment, thus we can afford to - * take the time to remove them when they're cancelled. + * Only a handful of timers at any moment, thus we can afford to take the + * time to remove them when they're cancelled. */ scheduler.setRemoveOnCancelPolicy(true); scheduler.setMaximumPoolSize(1); @@ -260,9 +263,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Indicates that the controller has successfully started. Starts the - * consumer for the internal topic, enters the {@link StartState}, and sets - * the filter for the initial state. + * Indicates that the controller has successfully started. Starts the consumer for the + * internal topic, enters the {@link StartState}, and sets the filter for the initial + * state. */ public void afterStart() { synchronized (curLocker) { @@ -274,8 +277,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Indicates that the controller is about to stop. Stops the consumer, the - * scheduler, and the current state. + * Indicates that the controller is about to stop. Stops the consumer, the scheduler, + * and the current state. */ public void beforeStop() { ScheduledThreadPoolExecutor sched; @@ -287,23 +290,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { if (!(current instanceof IdleState)) { dmaapMgr.stopConsumer(this); changeState(new IdleState(this)); - - // TODO - /* - * Need a brief delay here to allow "offline" message to be - * transmitted? - */ + publishAdmin(new Offline(getHost())); } } if (sched != null) { + logger.debug("stop scheduler for topic {}", getTopic()); sched.shutdownNow(); } } /** - * Indicates that the controller has stopped. Stops the publisher and logs a - * warning if any events are still in the queue. + * Indicates that the controller has stopped. Stops the publisher and logs a warning + * if any events are still in the queue. */ public void afterStop() { synchronized (curLocker) { @@ -312,25 +311,33 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { eventq.clear(); } - dmaapMgr.stopPublisher(); + /* + * stop the publisher, but allow time for any Offline message to be + * transmitted + */ + dmaapMgr.stopPublisher(props.getOfflinePubWaitMs()); } } /** - * Indicates that the controller is about to be locked. Enters the idle - * state, as all it will be doing is forwarding messages. + * Indicates that the controller is about to be locked. Enters the idle state, as all + * it will be doing is forwarding messages. */ public void beforeLock() { + logger.info("locking manager for topic {}", getTopic()); + synchronized (curLocker) { changeState(new IdleState(this)); } } /** - * Indicates that the controller has been unlocked. Enters the start state, - * if the controller is running. + * Indicates that the controller has been unlocked. Enters the start state, if the + * controller is running. */ public void afterUnlock() { + logger.info("unlocking manager for topic {}", getTopic()); + synchronized (curLocker) { if (controller.isAlive() && current instanceof IdleState && scheduler != null) { changeState(new StartState(this)); @@ -339,8 +346,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Changes the finite state machine to a new state, provided the new state - * is not {@code null}. + * Changes the finite state machine to a new state, provided the new state is not + * {@code null}. * * @param newState new state, or {@code null} if to remain unchanged */ @@ -379,30 +386,46 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { CountDownLatch latch = new CountDownLatch(1); /* - * We don't want to build up items in our queue if we can't forward them - * to other hosts, so we just stop the controller. + * We don't want to build up items in our queue if we can't forward them to other + * hosts, so we just stop the controller. * * Use a background thread to prevent deadlocks. */ - new Thread() { - @Override - public void run() { - controller.stop(); - latch.countDown(); - } - }.start(); + new Thread(() -> { + controller.stop(); + latch.countDown(); + }).start(); return latch; } @Override - public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task) { - return scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); + public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return new CancellableScheduledTask() { + @Override + public void cancel() { + fut.cancel(false); + } + }; } @Override - public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { - return scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, TimeUnit.MILLISECONDS); + public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, + TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return new CancellableScheduledTask() { + @Override + public void cancel() { + fut.cancel(false); + } + }; } @Override @@ -412,6 +435,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { @Override public void publish(String channel, Message msg) { + logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic()); + msg.setChannel(channel); try { @@ -434,8 +459,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * * @param topic2 * @param event - * @return {@code true} if the event was handled, {@code false} if the - * controller should handle it + * @return {@code true} if the event was handled, {@code false} if the controller + * should handle it */ @Override public void onTopicEvent(CommInfrastructure commType, String topic2, String event) { @@ -452,20 +477,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Called by the PolicyController before it offers the event to the - * DroolsController. If the controller is locked, then it isn't processing - * events. However, they still need to be forwarded, thus in that case, they - * are decoded and forwarded. + * Called by the PolicyController before it offers the event to the DroolsController. + * If the controller is locked, then it isn't processing events. However, they still + * need to be forwarded, thus in that case, they are decoded and forwarded. * <p> - * On the other hand, if the controller is not locked, then we just return - * immediately and let {@link #beforeInsert(Object, String, String, Object) - * beforeInsert()} handle it instead, as it already has the decoded message. + * On the other hand, if the controller is not locked, then we just return immediately + * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle + * it instead, as it already has the decoded message. * * @param protocol * @param topic2 * @param event - * @return {@code true} if the event was handled by the manager, - * {@code false} if it must still be handled by the invoker + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker */ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { @@ -478,15 +502,14 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Called by the DroolsController before it inserts the event into the rule - * engine. + * Called by the DroolsController before it inserts the event into the rule engine. * * @param protocol * @param topic2 * @param event original event text, as received from the Bus * @param event2 event, as an object - * @return {@code true} if the event was handled by the manager, - * {@code false} if it must still be handled by the invoker + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker */ public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) { @@ -504,10 +527,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param protocol * @param topic2 * @param event - * @param reqid request id extracted from the event, or {@code null} if it - * couldn't be extracted - * @return {@code true} if the event was handled by the manager, - * {@code false} if it must still be handled by the invoker + * @param reqid request id extracted from the event, or {@code null} if it couldn't be + * extracted + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker */ private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) { if (reqid == null) { @@ -524,6 +547,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { Forward ev = makeForward(protocol, topic2, event, reqid); if (ev == null) { // invalid args - consume the message + logger.warn("constructed an invalid Forward message on topic {}", getTopic()); return true; } @@ -536,12 +560,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Handles an event from an external topic. * * @param event - * @return {@code true} if the event was handled, {@code false} if the - * invoker should handle it + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it */ private boolean handleExternal(Forward event) { if (assignments == null) { // no bucket assignments yet - add it to the queue + logger.info("queued event for request {}", event.getRequestId()); eventq.add(event); // we've consumed the event @@ -556,17 +581,17 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Handles a {@link Forward} event, possibly forwarding it again. * * @param event - * @return {@code true} if the event was handled, {@code false} if the - * invoker should handle it + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it */ private boolean handleEvent(Forward event) { - int bucket = Math.abs(event.getRequestId().hashCode()) % assignments.size(); - String target = assignments.getAssignedHost(bucket); + String target = assignments.getAssignedHost(event.getRequestId().hashCode()); if (target == null) { /* * This bucket has no assignment - just discard the event */ + logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic()); return true; } @@ -574,6 +599,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /* * Message belongs to this host - allow the controller to handle it. */ + logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic()); return false; } @@ -583,6 +609,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { topic); } else { + logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); event.bumpNumHops(); publish(target, event); } @@ -678,16 +705,21 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param event */ private void inject(Forward event) { - intercept = false; - listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload()); + logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic()); - intercept = true; + try { + intercept = false; + listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload()); + + } finally { + intercept = true; + } } /** - * Handles an event from the internal topic. This uses reflection to - * identify the appropriate process() method to invoke, based on the type of - * Message that was decoded. + * Handles an event from the internal topic. This uses reflection to identify the + * appropriate process() method to invoke, based on the type of Message that was + * decoded. * * @param event the serialized {@link Message} read from the internal topic */ @@ -711,29 +743,48 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } catch (NoSuchMethodException | SecurityException e) { logger.error("no processor for message {} for topic {}", clazz, topic, e); - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - logger.error("failed to process message {} for topic {}", clazz, topic, e); - - } catch (PoolingFeatureException e) { + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException + | PoolingFeatureException e) { logger.error("failed to process message {} for topic {}", clazz, topic, e); } } @Override - public void startDistributing(BucketAssignments assignments) { - if (assignments == null) { - return; + public CountDownLatch startDistributing(BucketAssignments asgn) { + if (asgn == null) { + return null; } + logger.info("new assignments for topic {}", getTopic()); + synchronized (curLocker) { - this.assignments = assignments; + assignments = asgn; + } + + /* + * publish the events from the event queue, but do it in a background thread so + * that the state machine can enter its correct state BEFORE we start processing + * the events + */ + CountDownLatch latch = new CountDownLatch(1); + + new Thread(() -> { + synchronized (curLocker) { + if (assignments == null) { + return; + } - // now that we have assignments, we can process the queue - Forward ev; - while ((ev = eventq.poll()) != null) { - handle(ev); + // now that we have assignments, we can process the queue + Forward ev; + while ((ev = eventq.poll()) != null) { + handle(ev); + } + + latch.countDown(); } - } + }).start(); + + return latch; } @Override @@ -762,8 +813,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Action to run a timer task. Only runs the task if the machine is still in - * the state that it was in when the timer was created. + * Action to run a timer task. Only runs the task if the machine is still in the state + * that it was in when the timer was created. */ private class TimerAction implements Runnable { @@ -790,7 +841,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { public void run() { synchronized (curLocker) { if (current == origState) { - changeState(task.fire(null)); + changeState(task.fire()); } } } @@ -818,7 +869,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @return a new set of extractors */ public ClassExtractors makeClassExtractors(Properties props) { - return new ClassExtractors(props, PROP_EXTRACTOR_PREFIX, EXTRACTOR_TYPE); + return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX, + PoolingProperties.EXTRACTOR_TYPE); } /** @@ -846,8 +898,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * * @param drools drools controller * @param topic topic on which the event was received - * @return {@code true} if the event can be decoded, {@code false} - * otherwise + * @return {@code true} if the event can be decoded, {@code false} otherwise */ public boolean canDecodeEvent(DroolsController drools, String topic) { return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java index 1cbe5cb9..4e8de0d0 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java @@ -30,6 +30,11 @@ import org.onap.policy.common.utils.properties.exception.PropertyException; public class PoolingProperties extends SpecPropertyConfiguration { /** + * The feature name, used to retrieve properties. + */ + public static final String FEATURE_NAME = "feature-pooling-dmaap"; + + /** * Feature properties all begin with this prefix. */ public static final String PREFIX = "pooling."; @@ -51,6 +56,17 @@ public class PoolingProperties extends SpecPropertyConfiguration { public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds"; public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds"; public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds"; + public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds"; + + /** + * Type of item that the extractors will be extracting. + */ + public static final String EXTRACTOR_TYPE = "requestId"; + + /** + * Prefix for extractor properties. + */ + public static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE; /** * Properties from which this was constructed. @@ -113,6 +129,13 @@ public class PoolingProperties extends SpecPropertyConfiguration { private long interHeartbeatMs; /** + * Time, in milliseconds, to wait for an "Offline" message to be published + * to DMaaP. + */ + @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000") + private long offlinePubWaitMs; + + /** * @param controllerName the name of the controller * @param props set of properties used to configure this * @throws PropertyException if an error occurs @@ -159,4 +182,8 @@ public class PoolingProperties extends SpecPropertyConfiguration { public long getInterHeartbeatMs() { return interHeartbeatMs; } + + public long getOfflinePubWaitMs() { + return offlinePubWaitMs; + } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java index e4557404..31b4e614 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java @@ -23,8 +23,7 @@ package org.onap.policy.drools.pooling; import java.util.Properties; /** - * Properties with an optional specialization (e.g., session name, controller - * name). + * Properties with an optional specialization (e.g., session name, controller name). */ public class SpecProperties extends Properties { private static final long serialVersionUID = 1L; @@ -41,10 +40,8 @@ public class SpecProperties extends Properties { /** * - * @param prefix the property name prefix that appears before any - * specialization - * @param specialization the property name specialization (e.g., session - * name) + * @param prefix the property name prefix that appears before any specialization + * @param specialization the property name specialization (e.g., session name) */ public SpecProperties(String prefix, String specialization) { this.prefix = withTrailingDot(prefix); @@ -53,10 +50,8 @@ public class SpecProperties extends Properties { /** * - * @param prefix the property name prefix that appears before any - * specialization - * @param specialization the property name specialization (e.g., session - * name) + * @param prefix the property name prefix that appears before any specialization + * @param specialization the property name specialization (e.g., session name) * @param props the default properties */ public SpecProperties(String prefix, String specialization, Properties props) { @@ -77,13 +72,14 @@ public class SpecProperties extends Properties { } /** - * Gets the property whose value has the given key, looking first for the - * specialized property name, and then for the generalized property name. + * Gets the property whose value has the given key, looking first for the specialized + * property name, and then for the generalized property name. * * @param key property name, without the specialization - * @return the value from the property set, or {@code null} if the property - * set does not contain the value + * @return the value from the property set, or {@code null} if the property set does + * not contain the value */ + @Override public String getProperty(String key) { if (!key.startsWith(prefix)) { return super.getProperty(key); @@ -106,4 +102,14 @@ public class SpecProperties extends Properties { protected String getSpecPrefix() { return specPrefix; } + + @Override + public final int hashCode() { + throw new UnsupportedOperationException("HostBucket cannot be hashed"); + } + + @Override + public final boolean equals(Object obj) { + throw new UnsupportedOperationException("cannot compare HostBuckets"); + } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java index cb12a6ac..2752ca8c 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java @@ -152,7 +152,7 @@ public class ClassExtractors { * cannot extract data items from objects of this type, so just * allocated a null extractor. */ - logger.warn("missing property " + prefix + clazz.getName()); + logger.warn("missing property {}{}", prefix, clazz.getName()); return new NullExtractor(); } @@ -166,24 +166,24 @@ public class ClassExtractors { */ private Extractor buildExtractor(Class<?> clazz, String value) { if (!value.startsWith("${")) { - logger.warn("property value for " + prefix + clazz.getName() + " does not start with '${'"); + logger.warn("property value for {}{} does not start with '${'", prefix, clazz.getName()); return new NullExtractor(); } if (!value.endsWith("}")) { - logger.warn("property value for " + prefix + clazz.getName() + " does not end with '}'"); + logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName()); return new NullExtractor(); } // get the part in the middle String val = value.substring(2, value.length() - 1); if (val.startsWith(".")) { - logger.warn("property value for " + prefix + clazz.getName() + " begins with '.'"); + logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName()); return new NullExtractor(); } if (val.endsWith(".")) { - logger.warn("property value for " + prefix + clazz.getName() + " ends with '.'"); + logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName()); return new NullExtractor(); } @@ -198,7 +198,7 @@ public class ClassExtractors { return (ext.extractors.length == 1 ? ext.extractors[0] : ext); } catch (ExtractorException e) { - logger.warn("cannot build extractor for " + clazz.getName()); + logger.warn("cannot build extractor for {}", clazz.getName(), e); return new NullExtractor(); } } @@ -255,7 +255,7 @@ public class ClassExtractors { @Override public Object extract(Object object) { - logger.warn("cannot extract " + type + " from " + object.getClass()); + logger.warn("cannot extract {} from {}", type, object.getClass()); return null; } } @@ -374,6 +374,7 @@ public class ClassExtractors { } catch (NoSuchMethodException expected) { // no getXxx() method, maybe there's a field by this name + logger.debug("no method {} in {}", nm, clazz.getName()); return null; } catch (SecurityException e) { @@ -407,9 +408,8 @@ public class ClassExtractors { * @param key item key within the map * @return a new extractor, or {@code null} if the class is not a Map * subclass - * @throws ExtractorException */ - private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) throws ExtractorException { + private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) { if (!Map.class.isAssignableFrom(clazz)) { return null; @@ -445,6 +445,7 @@ public class ClassExtractors { } catch (NoSuchFieldException expected) { // no field by this name - try super class & interfaces + logger.debug("no field {} in {}", name, clazz.getName()); } catch (SecurityException e) { throw new ExtractorException("inaccessible field " + clazz + "." + name, e); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java index 9ed32ae4..38d440cb 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java @@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling.extractor; /** * Used to extract an object contained within another object. */ +@FunctionalInterface public interface Extractor { /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java index aff9d860..032ea47e 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java @@ -53,7 +53,7 @@ public class MapExtractor implements Extractor { return map.get(key); } else { - logger.warn("expecting a map, instead of " + object.getClass()); + logger.warn("expecting a map, instead of {}", object.getClass()); return null; } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java index 8fd86c1e..ee871cbd 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java @@ -37,9 +37,19 @@ public class BucketAssignments { private static final Logger logger = LoggerFactory.getLogger(BucketAssignments.class); /** - * Number of buckets. + * The number of bits in the maximum number of buckets. */ - public static final int MAX_BUCKETS = 1024; + private static final int MAX_BUCKET_BITS = 10; + + /** + * Maximum number of buckets. Must be a power of two. + */ + public static final int MAX_BUCKETS = 1 << MAX_BUCKET_BITS; + + /** + * Used to ensure that a hash code is not negative. + */ + private static final int MAX_BUCKETS_MASK = MAX_BUCKETS - 1; /** * Identifies the host serving a particular bucket. @@ -55,8 +65,8 @@ public class BucketAssignments { /** * - * @param hostArray maps a bucket number (i.e., array index) to a host. All - * values must be non-null + * @param hostArray maps a bucket number (i.e., array index) to a host. All values + * must be non-null */ public BucketAssignments(String[] hostArray) { this.hostArray = hostArray; @@ -97,8 +107,7 @@ public class BucketAssignments { * Determines if a host has an assignment. * * @param host host to be checked - * @return {@code true} if the host has an assignment, {@code false} - * otherwise + * @return {@code true} if the host has an assignment, {@code false} otherwise */ @JsonIgnore public boolean hasAssignment(String host) { @@ -139,23 +148,17 @@ public class BucketAssignments { /** * Gets the host assigned to a given bucket. * - * @param bucket bucket number - * @return the assigned host, or {@code null} if the bucket has no assigned - * host + * @param hashCode hash code of the item whose assignment is desired + * @return the assigned host, or {@code null} if the item has no assigned host */ @JsonIgnore - public String getAssignedHost(int bucket) { - if (hostArray == null) { + public String getAssignedHost(int hashCode) { + if (hostArray == null || hostArray.length == 0) { logger.error("no buckets have been assigned"); return null; } - if (bucket < 0 || bucket >= hostArray.length) { - logger.error("invalid bucket number {} maximum {}", bucket, hostArray.length); - return null; - } - - return hostArray[bucket]; + return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length]; } /** @@ -169,8 +172,8 @@ public class BucketAssignments { } /** - * Checks the validity of the assignments, verifying that all buckets have - * been assigned to a host. + * Checks the validity of the assignments, verifying that all buckets have been + * assigned to a host. * * @throws PoolingFeatureException if the assignments are invalid */ @@ -208,8 +211,6 @@ public class BucketAssignments { if (getClass() != obj.getClass()) return false; BucketAssignments other = (BucketAssignments) obj; - if (!Arrays.equals(hostArray, other.hostArray)) - return false; - return true; + return Arrays.equals(hostArray, other.hostArray); } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java index b59cfbb2..6122d361 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java @@ -146,6 +146,7 @@ public class Forward extends Message { } + @Override @JsonIgnore public void checkValidity() throws PoolingFeatureException { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java index 5f503a3b..b0a36cd9 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java @@ -24,16 +24,20 @@ import java.util.Arrays; import java.util.TreeSet; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The active state. In this state, this host has one more more bucket - * assignments and processes any events associated with one of its buckets. - * Other events are forwarded to appropriate target hosts. + * The active state. In this state, this host has one more more bucket assignments and + * processes any events associated with one of its buckets. Other events are forwarded to + * appropriate target hosts. */ public class ActiveState extends ProcessingState { + private static final Logger logger = LoggerFactory.getLogger(ActiveState.class); + /** * Set of hosts that have been assigned a bucket. */ @@ -50,8 +54,8 @@ public class ActiveState extends ProcessingState { private String predHost = ""; /** - * {@code True} if we saw this host's heart beat since the last check, - * {@code false} otherwise. + * {@code True} if we saw this host's heart beat since the last check, {@code false} + * otherwise. */ private boolean myHeartbeatSeen = false; @@ -74,14 +78,14 @@ public class ActiveState extends ProcessingState { } /** - * Determine this host's neighbors based on the order of the host UUIDs. - * Updates {@link #succHost} and {@link #predHost}. + * Determine this host's neighbors based on the order of the host UUIDs. Updates + * {@link #succHost} and {@link #predHost}. */ private void detmNeighbors() { if (assigned.size() < 2) { + logger.info("this host has no neighbors on topic {}", getTopic()); /* - * this host is the only one with any assignments - it has no - * neighbors + * this host is the only one with any assignments - it has no neighbors */ succHost = null; predHost = ""; @@ -91,11 +95,13 @@ public class ActiveState extends ProcessingState { if ((succHost = assigned.higher(getHost())) == null) { // wrapped around - successor is the first host in the set succHost = assigned.first(); + logger.info("this host's successor is {} on topic {}", succHost, getTopic()); } if ((predHost = assigned.lower(getHost())) == null) { // wrapped around - predecessor is the last host in the set predHost = assigned.last(); + logger.info("this host's predecessor is {} on topic {}", predHost, getTopic()); } } @@ -109,13 +115,14 @@ public class ActiveState extends ProcessingState { * Adds the timers. */ private void addTimers() { + logger.info("add timers"); /* * heart beat generator */ long genMs = getProperties().getActiveHeartbeatMs(); - scheduleWithFixedDelay(genMs, genMs, xxx -> { + scheduleWithFixedDelay(genMs, genMs, () -> { genHeartbeat(); return null; }); @@ -125,13 +132,14 @@ public class ActiveState extends ProcessingState { */ long interMs = getProperties().getInterHeartbeatMs(); - scheduleWithFixedDelay(interMs, interMs, xxx -> { + scheduleWithFixedDelay(interMs, interMs, () -> { if (myHeartbeatSeen) { myHeartbeatSeen = false; return null; } // missed my heart beat + logger.error("missed my heartbeat on topic {}", getTopic()); return internalTopicFailed(); }); @@ -141,13 +149,15 @@ public class ActiveState extends ProcessingState { */ if (!predHost.isEmpty()) { - scheduleWithFixedDelay(interMs, interMs, xxx -> { + scheduleWithFixedDelay(interMs, interMs, () -> { if (predHeartbeatSeen) { predHeartbeatSeen = false; return null; } // missed the predecessor's heart beat + logger.warn("missed predecessor's heartbeat on topic {}", getTopic()); + publish(makeQuery()); return goQuery(); @@ -172,70 +182,85 @@ public class ActiveState extends ProcessingState { String src = msg.getSource(); if (src == null) { + logger.warn("Heartbeat message has no source on topic {}", getTopic()); return null; } else if (src.equals(getHost())) { + logger.info("saw my heartbeat on topic {}", getTopic()); myHeartbeatSeen = true; } else if (src.equals(predHost)) { + logger.info("saw heartbeat from {} on topic {}", src, getTopic()); predHeartbeatSeen = true; - + + } else { + logger.info("ignored heartbeat message from {} on topic {}", src, getTopic()); } return null; } @Override + public State process(Leader msg) { + if (!isValid(msg)) { + return null; + } + + String src = msg.getSource(); + + if (getHost().compareTo(src) < 0) { + // our host would be a better leader - find out what's up + logger.warn("unexpected Leader message from {} on topic {}", src, getTopic()); + return goQuery(); + } + + logger.info("have a new leader {} on topic {}", src, getTopic()); + + return goActive(msg.getAssignments()); + } + + @Override public State process(Offline msg) { String src = msg.getSource(); if (src == null) { + logger.warn("Offline message has no source on topic {}", getTopic()); return null; } else if (!assigned.contains(src)) { /* - * the offline host wasn't assigned any buckets, so just ignore the - * message + * the offline host wasn't assigned any buckets, so just ignore the message */ + logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic()); return null; } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) { /* * Case 1: We are the leader. * - * Case 2: Our predecessor was the leader and it has gone offline - - * we should become the leader. + * Case 2: Our predecessor was the leader and it has gone offline - we should + * become the leader. * - * In either case, we are now the leader and we must re-balance the - * buckets since one of the hosts has gone offline. + * In either case, we are now the leader and we must re-balance the buckets + * since one of the hosts has gone offline. */ + logger.info("Offline message from source {} on topic {}", src, getTopic()); + assigned.remove(src); return becomeLeader(assigned); } else { /* - * Otherwise, we don't care right now - we'll wait for the leader to - * tell us it's been removed. + * Otherwise, we don't care right now - we'll wait for the leader to tell us + * it's been removed. */ + logger.info("ignore Offline message from source {} on topic {}", src, getTopic()); return null; } } - /** - * Transitions to the query state. - */ - @Override - public State process(Query msg) { - State next = super.process(msg); - if (next != null) { - return next; - } - - return goQuery(); - } - protected String getSuccHost() { return succHost; } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java index 27678360..4878c241 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java @@ -21,11 +21,6 @@ package org.onap.policy.drools.pooling.state; import org.onap.policy.drools.pooling.PoolingManager; -import org.onap.policy.drools.pooling.message.Heartbeat; -import org.onap.policy.drools.pooling.message.Identification; -import org.onap.policy.drools.pooling.message.Leader; -import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.pooling.message.Query; /** * Idle state, used when offline. @@ -35,51 +30,4 @@ public class IdleState extends State { public IdleState(PoolingManager mgr) { super(mgr); } - - @Override - public void stop() { - // do nothing - don't even send of "offline" message - } - - /** - * Discards the message. - */ - @Override - public State process(Heartbeat msg) { - return null; - } - - /** - * Discards the message. - */ - @Override - public State process(Identification msg) { - return null; - } - - /** - * Copies the assignments, but doesn't change states. - */ - @Override - public State process(Leader msg) { - super.process(msg); - return null; - } - - /** - * Discards the message. - */ - @Override - public State process(Offline msg) { - return null; - } - - /** - * Discards the message. - */ - @Override - public State process(Query msg) { - return null; - } - } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java index 1c8e4dcc..6be2fb84 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java @@ -21,13 +21,19 @@ package org.onap.policy.drools.pooling.state; import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The inactive state. In this state, we just wait a bit and then try to - * re-activate. In the meantime, all messages are ignored. + * The inactive state. In this state, we just wait a bit and then try to re-activate. In + * the meantime, all messages are ignored. */ public class InactiveState extends State { + private static final Logger logger = LoggerFactory.getLogger(InactiveState.class); + /** * * @param mgr @@ -41,11 +47,36 @@ public class InactiveState extends State { super.start(); - schedule(getProperties().getReactivateMs(), xxx -> goStart()); + schedule(getProperties().getReactivateMs(), () -> goStart()); + } + + @Override + public State process(Leader msg) { + if(isValid(msg)) { + logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic()); + startDistributing(msg.getAssignments()); + + if(msg.getAssignments().hasAssignment(getHost())) { + logger.info("received Leader message on topic {}", getTopic()); + return goActive(); + } + } + + return null; + } + + /** + * Generates an Identification message and goes to the query state. + */ + @Override + public State process(Query msg) { + logger.info("received Query message on topic {}", getTopic()); + publish(makeIdentification()); + return goQuery(); } /** - * Remains in this state. + * Remains in this state, without resetting any timers. */ @Override protected State goInactive() { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java index 2f830c66..1e9bb581 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java @@ -30,15 +30,13 @@ import java.util.SortedSet; import java.util.TreeSet; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.BucketAssignments; -import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Any state in which events are being processed locally and forwarded, as - * appropriate. + * Any state in which events are being processed locally and forwarded, as appropriate. */ public class ProcessingState extends State { @@ -52,8 +50,8 @@ public class ProcessingState extends State { /** * * @param mgr - * @param leader current known leader, which need not be the same as the - * assignment leader. Never {@code null} + * @param leader current known leader, which need not be the same as the assignment + * leader. Never {@code null} * @throws IllegalArgumentException if an argument is invalid */ public ProcessingState(PoolingManager mgr, String leader) { @@ -76,21 +74,31 @@ public class ProcessingState extends State { } /** - * Generates an Identification message and returns {@code null}. + * Goes active with a new set of assignments. + * + * @param asgn new assignments + * @return the new state, either Active or Inactive, depending on whether or not this + * host has an assignment */ - @Override - public State process(Query msg) { - publish(makeIdentification()); - return goQuery(); + protected State goActive(BucketAssignments asgn) { + startDistributing(asgn); + + if (asgn.hasAssignment(getHost())) { + return goActive(); + + } else { + return goInactive(); + } } /** - * Makes an Identification message. - * - * @return a new message + * Generates an Identification message and goes to the query state. */ - protected Identification makeIdentification() { - return new Identification(getHost(), getAssignments()); + @Override + public State process(Query msg) { + logger.info("received Query message on topic {}", getTopic()); + publish(makeIdentification()); + return goQuery(); } /** @@ -132,18 +140,17 @@ public class ProcessingState extends State { } /** - * Becomes the leader. Publishes a Leader message and enters the - * {@link ActiveState}. + * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}. * * @param alive hosts that are known to be alive * * @return the new state */ protected State becomeLeader(SortedSet<String> alive) { - String leader = getHost(); + String newLeader = getHost(); - if (!leader.equals(alive.first())) { - throw new IllegalArgumentException(leader + " cannot replace " + alive.first()); + if (!newLeader.equals(alive.first())) { + throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first()); } Leader msg = makeLeader(alive); @@ -155,8 +162,8 @@ public class ProcessingState extends State { } /** - * Makes a leader message. Assumes "this" host is the leader, and thus - * appears as the first host in the set of hosts that are still alive. + * Makes a leader message. Assumes "this" host is the leader, and thus appears as the + * first host in the set of hosts that are still alive. * * @param alive hosts that are known to be alive * @@ -222,8 +229,8 @@ public class ProcessingState extends State { } /** - * Removes excess hosts from the set of available hosts. Assumes "this" host - * is the leader, and thus appears as the first host in the set. + * Removes excess hosts from the set of available hosts. Assumes "this" host is the + * leader, and thus appears as the first host in the set. * * @param maxHosts maximum number of hosts to be retained * @param avail available hosts @@ -231,9 +238,9 @@ public class ProcessingState extends State { private void removeExcessHosts(int maxHosts, SortedSet<String> avail) { while (avail.size() > maxHosts) { /* - * Don't remove this host, as it's the leader. Since the leader is - * always at the front of the sorted set, we'll just pick off hosts - * from the back of the set. + * Don't remove this host, as it's the leader. Since the leader is always at + * the front of the sorted set, we'll just pick off hosts from the back of the + * set. */ String host = avail.last(); avail.remove(host); @@ -243,15 +250,15 @@ public class ProcessingState extends State { } /** - * Adds bucket indices to {@link HostBucket} objects. Buckets that are - * unassigned or assigned to a host that does not appear within the map are - * re-assigned to a host that appears within the map. + * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or + * assigned to a host that does not appear within the map are re-assigned to a host + * that appears within the map. * * @param bucket2host bucket assignments * @param host2data maps a host name to its {@link HostBucket} */ private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) { - LinkedList<Integer> nullBuckets = new LinkedList<Integer>(); + LinkedList<Integer> nullBuckets = new LinkedList<>(); for (int x = 0; x < bucket2host.length; ++x) { String host = bucket2host[x]; @@ -274,10 +281,9 @@ public class ProcessingState extends State { } /** - * Assigns null buckets (i.e., those having no assignment) to available - * hosts. + * Assigns null buckets (i.e., those having no assignment) to available hosts. * - * @param buckets available hosts + * @param buckets buckets that still need to be assigned to hosts * @param coll collection of current host-bucket assignments */ private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) { @@ -295,9 +301,9 @@ public class ProcessingState extends State { } /** - * Re-balances the buckets, taking from those that have a larger count and - * giving to those that have a smaller count. Populates an output array with - * the new assignments. + * Re-balances the buckets, taking from those that have a larger count and giving to + * those that have a smaller count. Populates an output array with the new + * assignments. * * @param coll current bucket assignment * @param bucket2host array to be populated with the new assignments @@ -349,7 +355,7 @@ public class ProcessingState extends State { /** * Tracks buckets that have been assigned to a host. */ - public static class HostBucket implements Comparable<HostBucket> { + protected static class HostBucket implements Comparable<HostBucket> { /** * Host to which the buckets have been assigned. */ @@ -395,8 +401,8 @@ public class ProcessingState extends State { } /** - * Compares host buckets, first by the number of buckets, and then by - * the host name. + * Compares host buckets, first by the number of buckets, and then by the host + * name. */ @Override public final int compareTo(HostBucket other) { @@ -406,5 +412,15 @@ public class ProcessingState extends State { } return d; } + + @Override + public final int hashCode() { + throw new UnsupportedOperationException("HostBucket cannot be hashed"); + } + + @Override + public final boolean equals(Object obj) { + throw new UnsupportedOperationException("cannot compare HostBuckets"); + } } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java index 57521960..9045165b 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java @@ -26,26 +26,31 @@ import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Offline; - -// TODO add logging +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The Query state. In this state, the host waits for the other hosts to - * identify themselves. Eventually, a leader should come forth. If not, it will - * transition to the active or inactive state, depending on whether or not it - * has an assignment in the current bucket assignments. The other possibility is - * that it may <i>become</i> the leader, in which case it will also transition - * to the active state. + * The Query state. In this state, the host waits for the other hosts to identify + * themselves. Eventually, a leader should come forth. If not, it will transition to the + * active or inactive state, depending on whether or not it has an assignment in the + * current bucket assignments. The other possibility is that it may <i>become</i> the + * leader, in which case it will also transition to the active state. */ public class QueryState extends ProcessingState { + private static final Logger logger = LoggerFactory.getLogger(QueryState.class); + /** - * Hosts that have sent an "Identification" message. Always includes this - * host. + * Hosts that have sent an "Identification" message. Always includes this host. */ private TreeSet<String> alive = new TreeSet<>(); /** + * {@code True} if we saw our own Identification method, {@code false} otherwise. + */ + private boolean sawSelfIdent = false; + + /** * * @param mgr */ @@ -71,44 +76,42 @@ public class QueryState extends ProcessingState { private void awaitIdentification() { /* - * Once we've waited long enough for all Identification messages to - * arrive, become the leader, assuming we should. + * Once we've waited long enough for all Identification messages to arrive, become + * the leader, assuming we should. */ - schedule(getProperties().getIdentificationMs(), xxx -> { + schedule(getProperties().getIdentificationMs(), () -> { + + if (!sawSelfIdent) { + // didn't see our identification + logger.error("missed our own Ident message on topic {}", getTopic()); + return internalTopicFailed(); - if (isLeader()) { + } else if (isLeader()) { // "this" host is the new leader + logger.info("this host is the new leader for topic {}", getTopic()); return becomeLeader(alive); } else if (hasAssignment()) { /* - * this host is not the new leader, but it does have an - * assignment - return to the active state while we wait for the - * leader + * this host is not the new leader, but it does have an assignment - + * return to the active state while we wait for the leader */ + logger.info("no new leader on topic {}", getTopic()); return goActive(); } else { // not the leader and no assignment yet + logger.info("no new leader on topic {}", getTopic()); return goInactive(); } }); } /** - * Remains in this state. - */ - @Override - public State goQuery() { - return null; - } - - /** * Determines if this host has an assignment in the CURRENT assignments. * - * @return {@code true} if this host has an assignment, {@code false} - * otherwise + * @return {@code true} if this host has an assignment, {@code false} otherwise */ protected boolean hasAssignment() { BucketAssignments asgn = getAssignments(); @@ -116,53 +119,73 @@ public class QueryState extends ProcessingState { } @Override + public State goQuery() { + return null; + } + + @Override public State process(Identification msg) { - recordInfo(msg.getSource(), msg.getAssignments()); + if (getHost().equals(msg.getSource())) { + logger.info("saw our own Ident message on topic {}", getTopic()); + sawSelfIdent = true; + + } else { + logger.info("received Ident message from {} on topic {}", msg.getSource(), getTopic()); + recordInfo(msg.getSource(), msg.getAssignments()); + } return null; } /** - * If the message leader is better than the leader we have, then go active - * with it. Otherwise, simply treat it like an {@link Identification} - * message. + * If the message leader is better than the leader we have, then go active with it. + * Otherwise, simply treat it like an {@link Identification} message. */ @Override public State process(Leader msg) { - BucketAssignments asgn = msg.getAssignments(); - if (asgn == null) { + if (!isValid(msg)) { return null; } - // ignore Leader messages from ourself String source = msg.getSource(); - if (source == null || source.equals(getHost())) { - return null; - } - - // the new leader must equal the source - if (!source.equals(asgn.getLeader())) { - return null; - } + BucketAssignments asgn = msg.getAssignments(); - // go active, if this has a better leader than the one we have - if (source.compareTo(getLeader()) < 0) { - return super.process(msg); + // go active, if this has a leader that's the same or better than the one we have + if (source.compareTo(getLeader()) <= 0) { + logger.warn("leader with {} on topic {}", source, getTopic()); + return goActive(asgn); } /* - * The message does not have an acceptable leader, but we'll still - * record its info. + * The message does not have an acceptable leader, but we'll still record its + * info. */ - recordInfo(msg.getSource(), msg.getAssignments()); + logger.info("record leader info from {} on topic {}", source, getTopic()); + recordInfo(source, asgn); + + return null; + } + + @Override + public State process(Offline msg) { + String host = msg.getSource(); + + if (host != null && !host.equals(getHost())) { + logger.warn("host {} offline on topic {}", host, getTopic()); + alive.remove(host); + setLeader(alive.first()); + + } else { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); + } return null; } /** - * Records info from a message, adding the source host name to - * {@link #alive}, and updating the bucket assignments. + * Records info from a message, adding the source host name to {@link #alive}, and + * updating the bucket assignments. * * @param source the message's source host * @param assignments assignments, or {@code null} @@ -181,29 +204,18 @@ public class QueryState extends ProcessingState { // record assignments, if we don't have any yet BucketAssignments current = getAssignments(); if (current == null) { + logger.info("received initial assignments on topic {}", getTopic()); setAssignments(assignments); return; } /* - * Record assignments, if the new assignments have a better (i.e., - * lesser) leader. + * Record assignments, if the new assignments have a better (i.e., lesser) leader. */ String curldr = current.getLeader(); - if (curldr == null || curldr.compareTo(assignments.getLeader()) > 0) { + if (curldr == null || assignments.getLeader().compareTo(curldr) < 0) { + logger.info("use new assignments from {} on topic {}", source, getTopic()); setAssignments(assignments); } } - - @Override - public State process(Offline msg) { - String host = msg.getSource(); - - if (host != null && !host.equals(getHost())) { - alive.remove(host); - setLeader(alive.first()); - } - - return null; - } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java index decbdfda..a978e245 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java @@ -28,19 +28,19 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; import java.util.Map; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.Heartbeat; -import org.onap.policy.drools.pooling.message.Identification; -import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; -import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The start state. Upon entry, a heart beat is generated and the event filter - * is changed to look for just that particular message. Once the message is - * seen, it goes into the {@link QueryState}. + * The start state. Upon entry, a heart beat is generated and the event filter is changed + * to look for just that particular message. Once the message is seen, it goes into the + * {@link QueryState}. */ public class StartState extends State { + private static final Logger logger = LoggerFactory.getLogger(StartState.class); + /** * Time stamp inserted into the heart beat message. */ @@ -69,54 +69,28 @@ public class StartState extends State { publish(getHost(), makeHeartbeat(hbTimestampMs)); - schedule(getProperties().getStartHeartbeatMs(), xxx -> internalTopicFailed()); + schedule(getProperties().getStartHeartbeatMs(), () -> { + logger.error("missed heartbeat on topic {}", getTopic()); + return internalTopicFailed(); + }); } /** - * Transitions to the query state if the heart beat originated from this - * host and its time stamp matches. + * Transitions to the query state if the heart beat originated from this host and its + * time stamp matches. */ @Override public State process(Heartbeat msg) { if (msg.getTimestampMs() == hbTimestampMs && getHost().equals(msg.getSource())) { // saw our own heart beat - transition to query state + logger.info("saw our own heartbeat on topic {}", getTopic()); publish(makeQuery()); return goQuery(); - } - - return null; - } - - /** - * Discards the message. - */ - @Override - public State process(Identification msg) { - return null; - } - - /** - * Processes the assignments, but remains in the current state. - */ - @Override - public State process(Leader msg) { - super.process(msg); - return null; - } - /** - * Discards the message. - */ - @Override - public State process(Offline msg) { - return null; - } + } else { + logger.info("ignored old heartbeat message from {} on topic {}", msg.getSource(), getTopic()); + } - /** - * Discards the message. - */ - @Override - public State process(Query msg) { return null; } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index 1e3a907e..421b9225 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -26,7 +26,7 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledFuture; +import org.onap.policy.drools.pooling.CancellableScheduledTask; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.PoolingProperties; import org.onap.policy.drools.pooling.message.BucketAssignments; @@ -37,16 +37,19 @@ import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A state in the finite state machine. * <p> - * A state may have several timers associated with it, which must be cancelled - * whenever the state is changed. Assumes that timers are not continuously added - * to the same state. + * A state may have several timers associated with it, which must be cancelled whenever + * the state is changed. Assumes that timers are not continuously added to the same state. */ public abstract class State { + private static final Logger logger = LoggerFactory.getLogger(State.class); + /** * Host pool manager. */ @@ -55,7 +58,7 @@ public abstract class State { /** * Timers added by this state. */ - private final List<ScheduledFuture<?>> timers = new LinkedList<>(); + private final List<CancellableScheduledTask> timers = new LinkedList<>(); /** * @@ -66,9 +69,9 @@ public abstract class State { } /** - * Gets the server-side filter to use when polling the DMaaP internal topic. - * The default method returns a filter that accepts messages on the admin - * channel and on the host's own channel. + * Gets the server-side filter to use when polling the DMaaP internal topic. The + * default method returns a filter that accepts messages on the admin channel and on + * the host's own channel. * * @return the server-side filter to use. */ @@ -80,25 +83,15 @@ public abstract class State { /** * Cancels the timers added by this state. */ - public void cancelTimers() { - for (ScheduledFuture<?> fut : timers) { - fut.cancel(false); - } + public final void cancelTimers() { + timers.forEach(timer -> timer.cancel()); } /** - * Starts the state. + * Starts the state. The default method simply logs a message and returns. */ public void start() { - - } - - /** - * Indicates that the finite state machine is stopping. Sends an "offline" - * message to the other hosts. - */ - public void stop() { - publish(makeOffline()); + logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic()); } /** @@ -106,7 +99,7 @@ public abstract class State { * * @return the new state */ - public State goStart() { + public final State goStart() { return mgr.goStart(); } @@ -124,7 +117,7 @@ public abstract class State { * * @return the new state */ - public State goActive() { + public final State goActive() { return mgr.goActive(); } @@ -138,13 +131,14 @@ public abstract class State { } /** - * Processes a message. The default method passes it to the manager to - * handle and returns {@code null}. + * Processes a message. The default method passes it to the manager to handle and + * returns {@code null}. * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ public State process(Forward msg) { + logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); mgr.handle(msg); return null; } @@ -156,6 +150,7 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Heartbeat msg) { + logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic()); return null; } @@ -166,41 +161,54 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Identification msg) { + logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic()); return null; } /** - * Processes a message. If this host has a new assignment, then it - * transitions to the active state. Otherwise, it transitions to the - * inactive state. + * Processes a message. The default method copies the assignments and then returns + * {@code null}. * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ public State process(Leader msg) { + if (isValid(msg)) { + logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic()); + startDistributing(msg.getAssignments()); + } + + return null; + } + + /** + * Determines if a message is valid and did not originate from this host. + * + * @param msg message to be validated + * @return {@code true} if the message is valid, {@code false} otherwise + */ + protected boolean isValid(Leader msg) { BucketAssignments asgn = msg.getAssignments(); if (asgn == null) { - return null; + logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic()); + return false; } + // ignore Leader messages from ourself String source = msg.getSource(); - if (source == null) { - return null; + if (source == null || source.equals(getHost())) { + logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic()); + return false; } // the new leader must equal the source - if (source.equals(asgn.getLeader())) { - startDistributing(asgn); - - if (asgn.hasAssignment(getHost())) { - return goActive(); + boolean result = source.equals(asgn.getLeader()); - } else { - return goInactive(); - } + if (!result) { + logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic()); } - return null; + return result; } /** @@ -210,6 +218,7 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Offline msg) { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); return null; } @@ -220,6 +229,7 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Query msg) { + logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic()); return null; } @@ -228,7 +238,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Identification msg) { + protected final void publish(Identification msg) { mgr.publishAdmin(msg); } @@ -237,7 +247,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Leader msg) { + protected final void publish(Leader msg) { mgr.publishAdmin(msg); } @@ -246,7 +256,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Offline msg) { + protected final void publish(Offline msg) { mgr.publishAdmin(msg); } @@ -255,7 +265,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Query msg) { + protected final void publish(Query msg) { mgr.publishAdmin(msg); } @@ -265,7 +275,7 @@ public abstract class State { * @param channel * @param msg message to be published */ - protected void publish(String channel, Forward msg) { + protected final void publish(String channel, Forward msg) { mgr.publish(channel, msg); } @@ -275,7 +285,7 @@ public abstract class State { * @param channel * @param msg message to be published */ - protected void publish(String channel, Heartbeat msg) { + protected final void publish(String channel, Heartbeat msg) { mgr.publish(channel, msg); } @@ -284,7 +294,7 @@ public abstract class State { * * @param assignments */ - protected void startDistributing(BucketAssignments assignments) { + protected final void startDistributing(BucketAssignments assignments) { if (assignments != null) { mgr.startDistributing(assignments); } @@ -296,7 +306,7 @@ public abstract class State { * @param delayMs * @param task */ - protected void schedule(long delayMs, StateTimerTask task) { + protected final void schedule(long delayMs, StateTimerTask task) { timers.add(mgr.schedule(delayMs, task)); } @@ -307,7 +317,7 @@ public abstract class State { * @param delayMs * @param task */ - protected void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task)); } @@ -316,7 +326,7 @@ public abstract class State { * * @return a new {@link InactiveState} */ - protected State internalTopicFailed() { + protected final State internalTopicFailed() { publish(makeOffline()); mgr.internalTopicFailed(); @@ -330,16 +340,25 @@ public abstract class State { * * @return a new message */ - protected Heartbeat makeHeartbeat(long timestampMs) { + protected final Heartbeat makeHeartbeat(long timestampMs) { return new Heartbeat(getHost(), timestampMs); } /** + * Makes an Identification message. + * + * @return a new message + */ + protected Identification makeIdentification() { + return new Identification(getHost(), getAssignments()); + } + + /** * Makes an "offline" message. * * @return a new message */ - protected Offline makeOffline() { + protected final Offline makeOffline() { return new Offline(getHost()); } @@ -348,7 +367,7 @@ public abstract class State { * * @return a new message */ - protected Query makeQuery() { + protected final Query makeQuery() { return new Query(getHost()); } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java index bd388b4e..346d4496 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java @@ -29,9 +29,8 @@ public interface StateTimerTask { /** * Fires the timer. * - * @param arg always {@code null} * @return the new state, or {@code null} if the state is unchanged */ - public State fire(Void arg); + public State fire(); } |