diff options
Diffstat (limited to 'feature-pooling-dmaap/src')
40 files changed, 2493 insertions, 1168 deletions
diff --git a/feature-pooling-dmaap/src/assembly/assemble_zip.xml b/feature-pooling-dmaap/src/assembly/assemble_zip.xml new file mode 100644 index 00000000..9908a2b9 --- /dev/null +++ b/feature-pooling-dmaap/src/assembly/assemble_zip.xml @@ -0,0 +1,76 @@ +<!-- + ============LICENSE_START======================================================= + feature-pooling-dmaap + ================================================================================ + Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<!-- Defines how we build the .zip file which is our distribution. --> + +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>feature-pooling-dmaap</id> + <formats> + <format>zip</format> + </formats> + + <!-- we want "system" and related files right at the root level as this + file is suppose to be unzip on top of a karaf distro. --> + <includeBaseDirectory>false</includeBaseDirectory> + + <fileSets> + <fileSet> + <directory>target</directory> + <outputDirectory>lib/feature</outputDirectory> + <includes> + <include>feature-pooling-dmaap-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>target/assembly/lib</directory> + <outputDirectory>lib/dependencies</outputDirectory> + <includes> + <include>*.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/feature/config</directory> + <outputDirectory>config</outputDirectory> + <fileMode>0644</fileMode> + <excludes/> + </fileSet> + <fileSet> + <directory>src/main/feature/bin</directory> + <outputDirectory>bin</outputDirectory> + <fileMode>0744</fileMode> + <excludes/> + </fileSet> + <fileSet> + <directory>src/main/feature/db</directory> + <outputDirectory>db</outputDirectory> + <fileMode>0744</fileMode> + <excludes/> + </fileSet> + <fileSet> + <directory>src/main/feature/install</directory> + <outputDirectory>install</outputDirectory> + <fileMode>0744</fileMode> + <excludes/> + </fileSet> + </fileSets> +</assembly> diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java index 428b5853..b16f44ad 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java @@ -1,5 +1,6 @@ /* * ============LICENSE_START======================================================= + * ONAP * ================================================================================ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ @@ -17,25 +18,16 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.drools.pooling.message; +package org.onap.policy.drools.pooling; -import org.junit.Test; -import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; -import com.fasterxml.jackson.databind.ObjectMapper; - -public class Trial { - - @Test - public void test() throws Exception { - ObjectMapper mapper = new ObjectMapper(); - - Message msg = new Forward("me", CommInfrastructure.DMAAP, "my topic", "a message", "my req"); - - String enc = mapper.writeValueAsString(msg); - System.out.println("enc=" + enc); - - Message msg2 = mapper.readValue(enc, Message.class); - System.out.println("class=" + msg2.getClass()); - } +/** + * A scheduled task that can be cancelled. + */ +@FunctionalInterface +public interface CancellableScheduledTask { + /** + * Cancels the scheduled task. + */ + public void cancel(); } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index 98543f29..102eda75 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -58,14 +58,14 @@ public class DmaapManager { private final TopicSink topicSink; /** - * Topic sources. In theory, there's only one item in this list, the - * internal DMaaP topic. + * Topic sources. In theory, there's only one item in this list, the internal DMaaP + * topic. */ private final List<TopicSource> sources; /** - * Topic sinks. In theory, there's only one item in this list, the internal - * DMaaP topic. + * Topic sinks. In theory, there's only one item in this list, the internal DMaaP + * topic. */ private final List<TopicSink> sinks; @@ -112,8 +112,8 @@ public class DmaapManager { } /** - * Used by junit tests to set the factory used to create various objects - * used by this class. + * Used by junit tests to set the factory used to create various objects used by this + * class. * * @param factory the new factory */ @@ -129,8 +129,7 @@ public class DmaapManager { * Finds the topic source associated with the internal DMaaP topic. * * @return the topic source - * @throws PoolingFeatureException if the source doesn't exist or is not - * filterable + * @throws PoolingFeatureException if the source doesn't exist or is not filterable */ private FilterableTopicSource findTopicSource() throws PoolingFeatureException { for (TopicSource src : sources) { @@ -174,6 +173,7 @@ public class DmaapManager { } try { + logger.info("start publishing to topic {}", topic); topicSink.start(); publishing = true; @@ -184,13 +184,28 @@ public class DmaapManager { /** * Stops the publisher. + * + * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued + * messages and close */ - public void stopPublisher() { + public void stopPublisher(long waitMs) { if (!publishing) { return; } + /* + * Give the sink a chance to transmit messages in the queue. It would be better if + * "waitMs" could be passed to sink.stop(), but that isn't an option at this time. + */ + try { + Thread.sleep(waitMs); + + } catch (InterruptedException e) { + logger.warn("message transmission stopped due to {}", e.getMessage()); + } + try { + logger.info("stop publishing to topic {}", topic); publishing = false; topicSink.stop(); @@ -209,6 +224,7 @@ public class DmaapManager { return; } + logger.info("start consuming from topic {}", topic); topicSource.register(listener); consuming = true; } @@ -223,6 +239,7 @@ public class DmaapManager { return; } + logger.info("stop consuming from topic {}", topic); consuming = false; topicSource.unregister(listener); } @@ -230,16 +247,16 @@ public class DmaapManager { /** * Sets the server-side filter to be used by the consumer. * - * @param filter the filter string, or {@code null} if no filter is to be - * used + * @param filter the filter string, or {@code null} if no filter is to be used * @throws PoolingFeatureException if the topic is not filterable */ public void setFilter(String filter) throws PoolingFeatureException { try { + logger.debug("change filter for topic {} to {}", topic, filter); topicSource.setFilter(filter); } catch (UnsupportedOperationException e) { - throw new PoolingFeatureException("cannot filter topic " + topic); + throw new PoolingFeatureException("cannot filter topic " + topic, e); } } @@ -247,8 +264,7 @@ public class DmaapManager { * Publishes a message to the sink. * * @param msg message to be published - * @throws PoolingFeatureException if an error occurs or the publisher isn't - * running + * @throws PoolingFeatureException if an error occurs or the publisher isn't running */ public void publish(String msg) throws PoolingFeatureException { if (!publishing) { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java deleted file mode 100644 index d2f32043..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling; - -// TODO move to policy-utils - -import java.lang.annotation.Annotation; -import java.lang.reflect.Field; -import java.util.Properties; -import org.onap.policy.common.utils.properties.SpecPropertyConfiguration; -import org.onap.policy.common.utils.properties.exception.PropertyException; - -/** - * Checks whether or not a feature is enabled. The name of the "enable" property - * is assumed to be of the form accepted by a {@link SpecPropertyConfiguration}, - * which contains a substitution place-holder into which a "specializer" (e.g., - * controller or session name) is substituted. - */ -public class FeatureEnabledChecker { - - /** - * - */ - private FeatureEnabledChecker() { - super(); - } - - /** - * Determines if a feature is enabled for a particular specializer. - * - * @param props properties from which to extract the "enabled" flag - * @param specializer specializer to be substituted into the property name - * when extracting - * @param propName the name of the "enabled" property - * @return {@code true} if the feature is enabled, or {@code false} if it is - * not enabled (or if the property doesn't exist) - * @throws IllegalArgumentException if the "enabled" property is not a - * boolean value - */ - public static boolean isFeatureEnabled(Properties props, String specializer, String propName) { - - try { - return new Config(specializer, props, propName).isEnabled(); - - } catch (PropertyException e) { - throw new IllegalArgumentException("cannot check property " + propName, e); - } - } - - - /** - * Configuration used to extract the value. - */ - private static class Config extends SpecPropertyConfiguration { - - /** - * There is a bit of trickery here. This annotation is just a - * place-holder to get the superclass to invoke the - * {@link #setValue(java.lang.reflect.Field, Properties, Property) - * setValue()} method. When that's invoked, we'll substitute - * {@link #propOverride} instead of this annotation. - */ - @Property(name = "feature-enabled-property-place-holder") - private boolean enabled; - - /** - * Annotation that will actually be used to set the field. - */ - private Property propOverride; - - /** - * - * @param specializer specializer to be substituted into the property - * name when extracting - * @param props properties from which to extract the "enabled" flag - * @param propName the name of the "enabled" property - * @throws PropertyException if an error occurs - */ - public Config(String specializer, Properties props, String propName) throws PropertyException { - super(specializer); - - propOverride = new Property() { - - @Override - public String name() { - return propName; - } - - @Override - public String defaultValue() { - // feature is disabled by default - return "false"; - } - - @Override - public String accept() { - return ""; - } - - @Override - public Class<? extends Annotation> annotationType() { - return Property.class; - } - }; - - setAllFields(props); - } - - /** - * Substitutes {@link #propOverride} for "prop". - */ - @Override - protected boolean setValue(Field field, Properties props, Property prop) throws PropertyException { - return super.setValue(field, props, propOverride); - } - - /** - * - * @return {@code true} if the feature is enabled, {@code false} - * otherwise - */ - public boolean isEnabled() { - return enabled; - } - }; -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index da47a031..21cbc4db 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -20,44 +20,41 @@ package org.onap.policy.drools.pooling; -import java.io.IOException; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; -import org.onap.policy.drools.core.PolicySessionFeatureAPI; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.features.DroolsControllerFeatureAPI; import org.onap.policy.drools.features.PolicyControllerFeatureAPI; +import org.onap.policy.drools.features.PolicyEngineFeatureAPI; +import org.onap.policy.drools.persistence.SystemPersistence; import org.onap.policy.drools.system.PolicyController; -import org.onap.policy.drools.utils.PropertyUtil; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.util.FeatureEnabledChecker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Controller/session pooling. Multiple hosts may be launched, all servicing the - * same controllers/sessions. When this feature is enabled, the requests are - * divided across the different hosts, instead of all running on a single, - * active host. + * Controller/session pooling. Multiple hosts may be launched, all servicing the same + * controllers/sessions. When this feature is enabled, the requests are divided across the + * different hosts, instead of all running on a single, active host. * <p> - * With each controller, there is an associated DMaaP topic that is used for - * internal communication between the different hosts serving the controller. + * With each controller, there is an associated DMaaP topic that is used for internal + * communication between the different hosts serving the controller. */ -public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControllerFeatureAPI, PolicySessionFeatureAPI { +public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerFeatureAPI, DroolsControllerFeatureAPI { private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class); - // TODO state-management doesn't allow more than one active host at a time - /** * Factory used to create objects. */ private static Factory factory; /** - * Entire set of feature properties, including those specific to various - * controllers. + * Entire set of feature properties, including those specific to various controllers. */ private Properties featProps = null; @@ -67,9 +64,9 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107); /** - * Arguments passed to beforeOffer(), which are saved for when the - * beforeInsert() is called later. As multiple threads can be active within - * the methods at the same time, we must keep this in thread local storage. + * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is + * called later. As multiple threads can be active within the methods at the same + * time, we must keep this in thread local storage. */ private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>(); @@ -98,20 +95,11 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl return 0; } - /** - * @throws PoolingFeatureRtException if the properties cannot be read or are - * invalid - */ @Override - public void globalInit(String[] args, String configDir) { - logger.info("initializing pooling feature"); - - try { - featProps = PropertyUtil.getProperties(configDir + "/feature-pooling-dmaap.properties"); - - } catch (IOException ex) { - throw new PoolingFeatureRtException(ex); - } + public boolean beforeStart(PolicyEngine engine) { + logger.info("initializing " + PoolingProperties.FEATURE_NAME); + featProps = factory.getProperties(PoolingProperties.FEATURE_NAME); + return false; } /** @@ -205,8 +193,8 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl @Override public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) { /* - * As this is invoked a lot, we'll directly call the manager's method - * instead of using the functional interface via doManager(). + * As this is invoked a lot, we'll directly call the manager's method instead of + * using the functional interface via doManager(). */ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { @@ -226,6 +214,7 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl OfferArgs args = offerArgs.get(); if (args == null) { + logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert"); return false; } @@ -234,16 +223,21 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl controller = factory.getController(droolsController); } catch (IllegalArgumentException | IllegalStateException e) { + logger.warn("cannot get controller for {} {}", droolsController.getGroupId(), + droolsController.getArtifactId(), e); return false; } + if (controller == null) { + logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(), + droolsController.getArtifactId()); return false; } /* - * As this is invoked a lot, we'll directly call the manager's method - * instead of using the functional interface via doManager(). + * As this is invoked a lot, we'll directly call the manager's method instead of + * using the functional interface via doManager(). */ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { @@ -264,14 +258,12 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl } /** - * Executes a function using the manager associated with the controller. - * Catches any exceptions from the function and re-throws it as a runtime - * exception. + * Executes a function using the manager associated with the controller. Catches any + * exceptions from the function and re-throws it as a runtime exception. * * @param controller * @param func function to be executed - * @return {@code true} if the function handled the request, {@code false} - * otherwise + * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ private boolean doManager(PolicyController controller, MgrFunc func) { @@ -284,26 +276,28 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl return func.apply(mgr); } catch (PoolingFeatureException e) { - throw e.toRuntimeException(); + throw new PoolingFeatureRtException(e); } } /** - * Executes a function using the manager associated with the controller and - * then deletes the manager. Catches any exceptions from the function and - * re-throws it as a runtime exception. + * Executes a function using the manager associated with the controller and then + * deletes the manager. Catches any exceptions from the function and re-throws it as a + * runtime exception. * * @param controller * @param func function to be executed - * @return {@code true} if the function handled the request, {@code false} - * otherwise + * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) { + String name = controller.getName(); + logger.info("remove feature-pool-dmaap manager for {}", name); + // NOTE: using "remove()" instead of "get()" - PoolingManagerImpl mgr = ctlr2pool.remove(controller.getName()); + PoolingManagerImpl mgr = ctlr2pool.remove(name); if (mgr == null) { return false; @@ -321,8 +315,8 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl /** * * @param mgr - * @return {@code true} if the request was handled by the manager, - * {@code false} otherwise + * @return {@code true} if the request was handled by the manager, {@code false} + * otherwise * @throws PoolingFeatureException */ public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException; @@ -367,6 +361,14 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl public static class Factory { /** + * @param featName feature name + * @return the properties for the specified feature + */ + public Properties getProperties(String featName) { + return SystemPersistence.manager.getProperties(featName); + } + + /** * Makes a pooling manager for a controller. * * @param controller diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java index 5efd1414..c3c81879 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java @@ -47,13 +47,4 @@ public class PoolingFeatureException extends Exception { super(message, cause, enableSuppression, writableStackTrace); } - /** - * Converts the exception to a runtime exception. - * - * @return a new runtime exception, wrapping this exception - */ - public PoolingFeatureRtException toRuntimeException() { - return new PoolingFeatureRtException(this); - } - } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java index de08d1e1..5036b605 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java @@ -21,7 +21,6 @@ package org.onap.policy.drools.pooling; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledFuture; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Message; @@ -48,19 +47,19 @@ public interface PoolingManager { public String getHost(); /** - * Gets the name of the internal DMaaP topic used by this manager to - * communicate with its other hosts. + * Gets the name of the internal DMaaP topic used by this manager to communicate with + * its other hosts. * * @return the name of the internal DMaaP topic */ public String getTopic(); /** - * Indicates that communication with internal DMaaP topic failed, typically - * due to a missed heart beat. Stops the PolicyController. + * Indicates that communication with internal DMaaP topic failed, typically due to a + * missed heart beat. Stops the PolicyController. * - * @return a latch that can be used to determine when the controller's - * stop() method has completed + * @return a latch that can be used to determine when the controller's stop() method + * has completed */ public CountDownLatch internalTopicFailed(); @@ -68,14 +67,16 @@ public interface PoolingManager { * Starts distributing requests according to the given bucket assignments. * * @param assignments must <i>not</i> be {@code null} + * @return a latch that can be used to determine when the events in the event queue + * have all be processed */ - public void startDistributing(BucketAssignments assignments); + public CountDownLatch startDistributing(BucketAssignments assignments); /** * Gets the current bucket assignments. * - * @return the current bucket assignments, or {@code null} if no assignments - * have been made + * @return the current bucket assignments, or {@code null} if no assignments have been + * made */ public BucketAssignments getAssignments(); @@ -95,8 +96,7 @@ public interface PoolingManager { public void publish(String channel, Message msg); /** - * Handles a {@link Forward} event that was received from the internal - * topic. + * Handles a {@link Forward} event that was received from the internal topic. * * @param event */ @@ -107,9 +107,9 @@ public interface PoolingManager { * * @param delayMs delay, in milliseconds * @param task - * @return a future that can be used to cancel the timer + * @return a new scheduled task */ - public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task); + public CancellableScheduledTask schedule(long delayMs, StateTimerTask task); /** * Schedules a timer to fire repeatedly. @@ -117,9 +117,9 @@ public interface PoolingManager { * @param initialDelayMs initial delay, in milliseconds * @param delayMs delay, in milliseconds * @param task - * @return a future that can be used to cancel the timer + * @return a new scheduled task */ - public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task); + public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task); /** * Transitions to the "start" state. diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index cd71670d..422efdd7 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.event.comm.TopicListener; @@ -38,6 +39,7 @@ import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.state.ActiveState; import org.onap.policy.drools.pooling.state.IdleState; import org.onap.policy.drools.pooling.state.InactiveState; @@ -52,39 +54,30 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; /** - * Implementation of a {@link PoolingManager}. Until bucket assignments have - * been made, events coming from external topics are saved in a queue for later - * processing. Once assignments are made, the saved events are processed. In - * addition, while the controller is locked, events are still forwarded to other - * hosts and bucket assignments are still updated, based on any {@link Leader} - * messages that it receives. + * Implementation of a {@link PoolingManager}. Until bucket assignments have been made, + * events coming from external topics are saved in a queue for later processing. Once + * assignments are made, the saved events are processed. In addition, while the controller + * is locked, events are still forwarded to other hosts and bucket assignments are still + * updated, based on any {@link Leader} messages that it receives. */ public class PoolingManagerImpl implements PoolingManager, TopicListener { private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class); - // TODO metrics, audit logging - /** * Maximum number of times a message can be forwarded. */ public static final int MAX_HOPS = 5; /** - * Type of item that the extractors will be extracting. - */ - private static final String EXTRACTOR_TYPE = "requestId"; - - /** - * Prefix for extractor properties. + * Factory used to create various objects. Can be overridden during junit testing. */ - private static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE; + private static Factory factory = new Factory(); /** - * Factory used to create various objects. Can be overridden during junit - * testing. + * ID of the last host that was created. */ - private static Factory factory = new Factory(); + private static final AtomicReference<String> lastHost = new AtomicReference<>(null); /** * ID of this host. @@ -102,14 +95,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final PolicyController controller; /** - * Where to offer events that have been forwarded to this host (i.e, the - * controller). + * Where to offer events that have been forwarded to this host (i.e, the controller). */ private final TopicListener listener; /** - * Used to encode & decode request objects received from & sent to a rule - * engine. + * Used to encode & decode request objects received from & sent to a rule engine. */ private final Serializer serializer; @@ -129,19 +120,18 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final ClassExtractors extractors; /** - * Lock used while updating {@link #current}. In general, public methods - * must use this, while private methods assume the lock is already held. + * Lock used while updating {@link #current}. In general, public methods must use + * this, while private methods assume the lock is already held. */ private final Object curLocker = new Object(); /** * Current state. * <p> - * This uses a finite state machine, wherein the state object contains all - * of the data relevant to that state. Each state object has a process() - * method, specific to each type of {@link Message} subclass. The method - * returns the next state object, or {@code null} if the state is to remain - * the same. + * This uses a finite state machine, wherein the state object contains all of the data + * relevant to that state. Each state object has a process() method, specific to each + * type of {@link Message} subclass. The method returns the next state object, or + * {@code null} if the state is to remain the same. */ private State current; @@ -177,13 +167,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { this.controller = controller; this.props = props; + lastHost.set(this.host); + try { this.listener = (TopicListener) controller; this.serializer = new Serializer(); this.topic = props.getPoolingTopic(); this.eventq = factory.makeEventQueue(props); - SpecProperties spec = new SpecProperties(PROP_EXTRACTOR_PREFIX, controller.getName()); + SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), + props.getSource()); this.extractors = factory.makeClassExtractors(spec); this.dmaapMgr = factory.makeDmaapManager(props); @@ -197,7 +190,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } catch (PoolingFeatureException e) { logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName()); - throw e.toRuntimeException(); + throw new PoolingFeatureRtException(e); } } @@ -210,6 +203,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** + * Used by junit tests. + * + * @return the ID of the last host that was created, or {@code null} if no hosts have + * been created yet + */ + protected static String getLastHost() { + return lastHost.get(); + } + + /** * Should only be used by junit tests. * * @return the current state @@ -234,22 +237,22 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Indicates that the controller is about to start. Starts the publisher for - * the internal topic, and creates a thread pool for the timers. + * Indicates that the controller is about to start. Starts the publisher for the + * internal topic, and creates a thread pool for the timers. * - * @throws PoolingFeatureException if the internal topic publisher cannot be - * started + * @throws PoolingFeatureException if the internal topic publisher cannot be started */ public void beforeStart() throws PoolingFeatureException { synchronized (curLocker) { if (scheduler == null) { dmaapMgr.startPublisher(); + logger.debug("make scheduler thread for topic {}", getTopic()); scheduler = factory.makeScheduler(); /* - * Only a handful of timers at any moment, thus we can afford to - * take the time to remove them when they're cancelled. + * Only a handful of timers at any moment, thus we can afford to take the + * time to remove them when they're cancelled. */ scheduler.setRemoveOnCancelPolicy(true); scheduler.setMaximumPoolSize(1); @@ -260,9 +263,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Indicates that the controller has successfully started. Starts the - * consumer for the internal topic, enters the {@link StartState}, and sets - * the filter for the initial state. + * Indicates that the controller has successfully started. Starts the consumer for the + * internal topic, enters the {@link StartState}, and sets the filter for the initial + * state. */ public void afterStart() { synchronized (curLocker) { @@ -274,8 +277,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Indicates that the controller is about to stop. Stops the consumer, the - * scheduler, and the current state. + * Indicates that the controller is about to stop. Stops the consumer, the scheduler, + * and the current state. */ public void beforeStop() { ScheduledThreadPoolExecutor sched; @@ -287,23 +290,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { if (!(current instanceof IdleState)) { dmaapMgr.stopConsumer(this); changeState(new IdleState(this)); - - // TODO - /* - * Need a brief delay here to allow "offline" message to be - * transmitted? - */ + publishAdmin(new Offline(getHost())); } } if (sched != null) { + logger.debug("stop scheduler for topic {}", getTopic()); sched.shutdownNow(); } } /** - * Indicates that the controller has stopped. Stops the publisher and logs a - * warning if any events are still in the queue. + * Indicates that the controller has stopped. Stops the publisher and logs a warning + * if any events are still in the queue. */ public void afterStop() { synchronized (curLocker) { @@ -312,25 +311,33 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { eventq.clear(); } - dmaapMgr.stopPublisher(); + /* + * stop the publisher, but allow time for any Offline message to be + * transmitted + */ + dmaapMgr.stopPublisher(props.getOfflinePubWaitMs()); } } /** - * Indicates that the controller is about to be locked. Enters the idle - * state, as all it will be doing is forwarding messages. + * Indicates that the controller is about to be locked. Enters the idle state, as all + * it will be doing is forwarding messages. */ public void beforeLock() { + logger.info("locking manager for topic {}", getTopic()); + synchronized (curLocker) { changeState(new IdleState(this)); } } /** - * Indicates that the controller has been unlocked. Enters the start state, - * if the controller is running. + * Indicates that the controller has been unlocked. Enters the start state, if the + * controller is running. */ public void afterUnlock() { + logger.info("unlocking manager for topic {}", getTopic()); + synchronized (curLocker) { if (controller.isAlive() && current instanceof IdleState && scheduler != null) { changeState(new StartState(this)); @@ -339,8 +346,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Changes the finite state machine to a new state, provided the new state - * is not {@code null}. + * Changes the finite state machine to a new state, provided the new state is not + * {@code null}. * * @param newState new state, or {@code null} if to remain unchanged */ @@ -379,30 +386,46 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { CountDownLatch latch = new CountDownLatch(1); /* - * We don't want to build up items in our queue if we can't forward them - * to other hosts, so we just stop the controller. + * We don't want to build up items in our queue if we can't forward them to other + * hosts, so we just stop the controller. * * Use a background thread to prevent deadlocks. */ - new Thread() { - @Override - public void run() { - controller.stop(); - latch.countDown(); - } - }.start(); + new Thread(() -> { + controller.stop(); + latch.countDown(); + }).start(); return latch; } @Override - public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task) { - return scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); + public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return new CancellableScheduledTask() { + @Override + public void cancel() { + fut.cancel(false); + } + }; } @Override - public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { - return scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, TimeUnit.MILLISECONDS); + public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + // wrap the task in a TimerAction and schedule it + ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, + TimeUnit.MILLISECONDS); + + // wrap the future in a "CancellableScheduledTask" + return new CancellableScheduledTask() { + @Override + public void cancel() { + fut.cancel(false); + } + }; } @Override @@ -412,6 +435,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { @Override public void publish(String channel, Message msg) { + logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic()); + msg.setChannel(channel); try { @@ -434,8 +459,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * * @param topic2 * @param event - * @return {@code true} if the event was handled, {@code false} if the - * controller should handle it + * @return {@code true} if the event was handled, {@code false} if the controller + * should handle it */ @Override public void onTopicEvent(CommInfrastructure commType, String topic2, String event) { @@ -452,20 +477,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Called by the PolicyController before it offers the event to the - * DroolsController. If the controller is locked, then it isn't processing - * events. However, they still need to be forwarded, thus in that case, they - * are decoded and forwarded. + * Called by the PolicyController before it offers the event to the DroolsController. + * If the controller is locked, then it isn't processing events. However, they still + * need to be forwarded, thus in that case, they are decoded and forwarded. * <p> - * On the other hand, if the controller is not locked, then we just return - * immediately and let {@link #beforeInsert(Object, String, String, Object) - * beforeInsert()} handle it instead, as it already has the decoded message. + * On the other hand, if the controller is not locked, then we just return immediately + * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle + * it instead, as it already has the decoded message. * * @param protocol * @param topic2 * @param event - * @return {@code true} if the event was handled by the manager, - * {@code false} if it must still be handled by the invoker + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker */ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { @@ -478,15 +502,14 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Called by the DroolsController before it inserts the event into the rule - * engine. + * Called by the DroolsController before it inserts the event into the rule engine. * * @param protocol * @param topic2 * @param event original event text, as received from the Bus * @param event2 event, as an object - * @return {@code true} if the event was handled by the manager, - * {@code false} if it must still be handled by the invoker + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker */ public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) { @@ -504,10 +527,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param protocol * @param topic2 * @param event - * @param reqid request id extracted from the event, or {@code null} if it - * couldn't be extracted - * @return {@code true} if the event was handled by the manager, - * {@code false} if it must still be handled by the invoker + * @param reqid request id extracted from the event, or {@code null} if it couldn't be + * extracted + * @return {@code true} if the event was handled by the manager, {@code false} if it + * must still be handled by the invoker */ private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) { if (reqid == null) { @@ -524,6 +547,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { Forward ev = makeForward(protocol, topic2, event, reqid); if (ev == null) { // invalid args - consume the message + logger.warn("constructed an invalid Forward message on topic {}", getTopic()); return true; } @@ -536,12 +560,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Handles an event from an external topic. * * @param event - * @return {@code true} if the event was handled, {@code false} if the - * invoker should handle it + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it */ private boolean handleExternal(Forward event) { if (assignments == null) { // no bucket assignments yet - add it to the queue + logger.info("queued event for request {}", event.getRequestId()); eventq.add(event); // we've consumed the event @@ -556,17 +581,17 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Handles a {@link Forward} event, possibly forwarding it again. * * @param event - * @return {@code true} if the event was handled, {@code false} if the - * invoker should handle it + * @return {@code true} if the event was handled, {@code false} if the invoker should + * handle it */ private boolean handleEvent(Forward event) { - int bucket = Math.abs(event.getRequestId().hashCode()) % assignments.size(); - String target = assignments.getAssignedHost(bucket); + String target = assignments.getAssignedHost(event.getRequestId().hashCode()); if (target == null) { /* * This bucket has no assignment - just discard the event */ + logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic()); return true; } @@ -574,6 +599,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /* * Message belongs to this host - allow the controller to handle it. */ + logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic()); return false; } @@ -583,6 +609,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { topic); } else { + logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); event.bumpNumHops(); publish(target, event); } @@ -678,16 +705,21 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param event */ private void inject(Forward event) { - intercept = false; - listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload()); + logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic()); - intercept = true; + try { + intercept = false; + listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload()); + + } finally { + intercept = true; + } } /** - * Handles an event from the internal topic. This uses reflection to - * identify the appropriate process() method to invoke, based on the type of - * Message that was decoded. + * Handles an event from the internal topic. This uses reflection to identify the + * appropriate process() method to invoke, based on the type of Message that was + * decoded. * * @param event the serialized {@link Message} read from the internal topic */ @@ -711,29 +743,48 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } catch (NoSuchMethodException | SecurityException e) { logger.error("no processor for message {} for topic {}", clazz, topic, e); - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - logger.error("failed to process message {} for topic {}", clazz, topic, e); - - } catch (PoolingFeatureException e) { + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException + | PoolingFeatureException e) { logger.error("failed to process message {} for topic {}", clazz, topic, e); } } @Override - public void startDistributing(BucketAssignments assignments) { - if (assignments == null) { - return; + public CountDownLatch startDistributing(BucketAssignments asgn) { + if (asgn == null) { + return null; } + logger.info("new assignments for topic {}", getTopic()); + synchronized (curLocker) { - this.assignments = assignments; + assignments = asgn; + } + + /* + * publish the events from the event queue, but do it in a background thread so + * that the state machine can enter its correct state BEFORE we start processing + * the events + */ + CountDownLatch latch = new CountDownLatch(1); + + new Thread(() -> { + synchronized (curLocker) { + if (assignments == null) { + return; + } - // now that we have assignments, we can process the queue - Forward ev; - while ((ev = eventq.poll()) != null) { - handle(ev); + // now that we have assignments, we can process the queue + Forward ev; + while ((ev = eventq.poll()) != null) { + handle(ev); + } + + latch.countDown(); } - } + }).start(); + + return latch; } @Override @@ -762,8 +813,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Action to run a timer task. Only runs the task if the machine is still in - * the state that it was in when the timer was created. + * Action to run a timer task. Only runs the task if the machine is still in the state + * that it was in when the timer was created. */ private class TimerAction implements Runnable { @@ -790,7 +841,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { public void run() { synchronized (curLocker) { if (current == origState) { - changeState(task.fire(null)); + changeState(task.fire()); } } } @@ -818,7 +869,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @return a new set of extractors */ public ClassExtractors makeClassExtractors(Properties props) { - return new ClassExtractors(props, PROP_EXTRACTOR_PREFIX, EXTRACTOR_TYPE); + return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX, + PoolingProperties.EXTRACTOR_TYPE); } /** @@ -846,8 +898,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * * @param drools drools controller * @param topic topic on which the event was received - * @return {@code true} if the event can be decoded, {@code false} - * otherwise + * @return {@code true} if the event can be decoded, {@code false} otherwise */ public boolean canDecodeEvent(DroolsController drools, String topic) { return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java index 1cbe5cb9..4e8de0d0 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java @@ -30,6 +30,11 @@ import org.onap.policy.common.utils.properties.exception.PropertyException; public class PoolingProperties extends SpecPropertyConfiguration { /** + * The feature name, used to retrieve properties. + */ + public static final String FEATURE_NAME = "feature-pooling-dmaap"; + + /** * Feature properties all begin with this prefix. */ public static final String PREFIX = "pooling."; @@ -51,6 +56,17 @@ public class PoolingProperties extends SpecPropertyConfiguration { public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds"; public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds"; public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds"; + public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds"; + + /** + * Type of item that the extractors will be extracting. + */ + public static final String EXTRACTOR_TYPE = "requestId"; + + /** + * Prefix for extractor properties. + */ + public static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE; /** * Properties from which this was constructed. @@ -113,6 +129,13 @@ public class PoolingProperties extends SpecPropertyConfiguration { private long interHeartbeatMs; /** + * Time, in milliseconds, to wait for an "Offline" message to be published + * to DMaaP. + */ + @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000") + private long offlinePubWaitMs; + + /** * @param controllerName the name of the controller * @param props set of properties used to configure this * @throws PropertyException if an error occurs @@ -159,4 +182,8 @@ public class PoolingProperties extends SpecPropertyConfiguration { public long getInterHeartbeatMs() { return interHeartbeatMs; } + + public long getOfflinePubWaitMs() { + return offlinePubWaitMs; + } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java index e4557404..31b4e614 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java @@ -23,8 +23,7 @@ package org.onap.policy.drools.pooling; import java.util.Properties; /** - * Properties with an optional specialization (e.g., session name, controller - * name). + * Properties with an optional specialization (e.g., session name, controller name). */ public class SpecProperties extends Properties { private static final long serialVersionUID = 1L; @@ -41,10 +40,8 @@ public class SpecProperties extends Properties { /** * - * @param prefix the property name prefix that appears before any - * specialization - * @param specialization the property name specialization (e.g., session - * name) + * @param prefix the property name prefix that appears before any specialization + * @param specialization the property name specialization (e.g., session name) */ public SpecProperties(String prefix, String specialization) { this.prefix = withTrailingDot(prefix); @@ -53,10 +50,8 @@ public class SpecProperties extends Properties { /** * - * @param prefix the property name prefix that appears before any - * specialization - * @param specialization the property name specialization (e.g., session - * name) + * @param prefix the property name prefix that appears before any specialization + * @param specialization the property name specialization (e.g., session name) * @param props the default properties */ public SpecProperties(String prefix, String specialization, Properties props) { @@ -77,13 +72,14 @@ public class SpecProperties extends Properties { } /** - * Gets the property whose value has the given key, looking first for the - * specialized property name, and then for the generalized property name. + * Gets the property whose value has the given key, looking first for the specialized + * property name, and then for the generalized property name. * * @param key property name, without the specialization - * @return the value from the property set, or {@code null} if the property - * set does not contain the value + * @return the value from the property set, or {@code null} if the property set does + * not contain the value */ + @Override public String getProperty(String key) { if (!key.startsWith(prefix)) { return super.getProperty(key); @@ -106,4 +102,14 @@ public class SpecProperties extends Properties { protected String getSpecPrefix() { return specPrefix; } + + @Override + public final int hashCode() { + throw new UnsupportedOperationException("HostBucket cannot be hashed"); + } + + @Override + public final boolean equals(Object obj) { + throw new UnsupportedOperationException("cannot compare HostBuckets"); + } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java index cb12a6ac..2752ca8c 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java @@ -152,7 +152,7 @@ public class ClassExtractors { * cannot extract data items from objects of this type, so just * allocated a null extractor. */ - logger.warn("missing property " + prefix + clazz.getName()); + logger.warn("missing property {}{}", prefix, clazz.getName()); return new NullExtractor(); } @@ -166,24 +166,24 @@ public class ClassExtractors { */ private Extractor buildExtractor(Class<?> clazz, String value) { if (!value.startsWith("${")) { - logger.warn("property value for " + prefix + clazz.getName() + " does not start with '${'"); + logger.warn("property value for {}{} does not start with '${'", prefix, clazz.getName()); return new NullExtractor(); } if (!value.endsWith("}")) { - logger.warn("property value for " + prefix + clazz.getName() + " does not end with '}'"); + logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName()); return new NullExtractor(); } // get the part in the middle String val = value.substring(2, value.length() - 1); if (val.startsWith(".")) { - logger.warn("property value for " + prefix + clazz.getName() + " begins with '.'"); + logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName()); return new NullExtractor(); } if (val.endsWith(".")) { - logger.warn("property value for " + prefix + clazz.getName() + " ends with '.'"); + logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName()); return new NullExtractor(); } @@ -198,7 +198,7 @@ public class ClassExtractors { return (ext.extractors.length == 1 ? ext.extractors[0] : ext); } catch (ExtractorException e) { - logger.warn("cannot build extractor for " + clazz.getName()); + logger.warn("cannot build extractor for {}", clazz.getName(), e); return new NullExtractor(); } } @@ -255,7 +255,7 @@ public class ClassExtractors { @Override public Object extract(Object object) { - logger.warn("cannot extract " + type + " from " + object.getClass()); + logger.warn("cannot extract {} from {}", type, object.getClass()); return null; } } @@ -374,6 +374,7 @@ public class ClassExtractors { } catch (NoSuchMethodException expected) { // no getXxx() method, maybe there's a field by this name + logger.debug("no method {} in {}", nm, clazz.getName()); return null; } catch (SecurityException e) { @@ -407,9 +408,8 @@ public class ClassExtractors { * @param key item key within the map * @return a new extractor, or {@code null} if the class is not a Map * subclass - * @throws ExtractorException */ - private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) throws ExtractorException { + private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) { if (!Map.class.isAssignableFrom(clazz)) { return null; @@ -445,6 +445,7 @@ public class ClassExtractors { } catch (NoSuchFieldException expected) { // no field by this name - try super class & interfaces + logger.debug("no field {} in {}", name, clazz.getName()); } catch (SecurityException e) { throw new ExtractorException("inaccessible field " + clazz + "." + name, e); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java index 9ed32ae4..38d440cb 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java @@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling.extractor; /** * Used to extract an object contained within another object. */ +@FunctionalInterface public interface Extractor { /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java index aff9d860..032ea47e 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java @@ -53,7 +53,7 @@ public class MapExtractor implements Extractor { return map.get(key); } else { - logger.warn("expecting a map, instead of " + object.getClass()); + logger.warn("expecting a map, instead of {}", object.getClass()); return null; } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java index 8fd86c1e..ee871cbd 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java @@ -37,9 +37,19 @@ public class BucketAssignments { private static final Logger logger = LoggerFactory.getLogger(BucketAssignments.class); /** - * Number of buckets. + * The number of bits in the maximum number of buckets. */ - public static final int MAX_BUCKETS = 1024; + private static final int MAX_BUCKET_BITS = 10; + + /** + * Maximum number of buckets. Must be a power of two. + */ + public static final int MAX_BUCKETS = 1 << MAX_BUCKET_BITS; + + /** + * Used to ensure that a hash code is not negative. + */ + private static final int MAX_BUCKETS_MASK = MAX_BUCKETS - 1; /** * Identifies the host serving a particular bucket. @@ -55,8 +65,8 @@ public class BucketAssignments { /** * - * @param hostArray maps a bucket number (i.e., array index) to a host. All - * values must be non-null + * @param hostArray maps a bucket number (i.e., array index) to a host. All values + * must be non-null */ public BucketAssignments(String[] hostArray) { this.hostArray = hostArray; @@ -97,8 +107,7 @@ public class BucketAssignments { * Determines if a host has an assignment. * * @param host host to be checked - * @return {@code true} if the host has an assignment, {@code false} - * otherwise + * @return {@code true} if the host has an assignment, {@code false} otherwise */ @JsonIgnore public boolean hasAssignment(String host) { @@ -139,23 +148,17 @@ public class BucketAssignments { /** * Gets the host assigned to a given bucket. * - * @param bucket bucket number - * @return the assigned host, or {@code null} if the bucket has no assigned - * host + * @param hashCode hash code of the item whose assignment is desired + * @return the assigned host, or {@code null} if the item has no assigned host */ @JsonIgnore - public String getAssignedHost(int bucket) { - if (hostArray == null) { + public String getAssignedHost(int hashCode) { + if (hostArray == null || hostArray.length == 0) { logger.error("no buckets have been assigned"); return null; } - if (bucket < 0 || bucket >= hostArray.length) { - logger.error("invalid bucket number {} maximum {}", bucket, hostArray.length); - return null; - } - - return hostArray[bucket]; + return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length]; } /** @@ -169,8 +172,8 @@ public class BucketAssignments { } /** - * Checks the validity of the assignments, verifying that all buckets have - * been assigned to a host. + * Checks the validity of the assignments, verifying that all buckets have been + * assigned to a host. * * @throws PoolingFeatureException if the assignments are invalid */ @@ -208,8 +211,6 @@ public class BucketAssignments { if (getClass() != obj.getClass()) return false; BucketAssignments other = (BucketAssignments) obj; - if (!Arrays.equals(hostArray, other.hostArray)) - return false; - return true; + return Arrays.equals(hostArray, other.hostArray); } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java index b59cfbb2..6122d361 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java @@ -146,6 +146,7 @@ public class Forward extends Message { } + @Override @JsonIgnore public void checkValidity() throws PoolingFeatureException { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java index 5f503a3b..b0a36cd9 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java @@ -24,16 +24,20 @@ import java.util.Arrays; import java.util.TreeSet; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The active state. In this state, this host has one more more bucket - * assignments and processes any events associated with one of its buckets. - * Other events are forwarded to appropriate target hosts. + * The active state. In this state, this host has one more more bucket assignments and + * processes any events associated with one of its buckets. Other events are forwarded to + * appropriate target hosts. */ public class ActiveState extends ProcessingState { + private static final Logger logger = LoggerFactory.getLogger(ActiveState.class); + /** * Set of hosts that have been assigned a bucket. */ @@ -50,8 +54,8 @@ public class ActiveState extends ProcessingState { private String predHost = ""; /** - * {@code True} if we saw this host's heart beat since the last check, - * {@code false} otherwise. + * {@code True} if we saw this host's heart beat since the last check, {@code false} + * otherwise. */ private boolean myHeartbeatSeen = false; @@ -74,14 +78,14 @@ public class ActiveState extends ProcessingState { } /** - * Determine this host's neighbors based on the order of the host UUIDs. - * Updates {@link #succHost} and {@link #predHost}. + * Determine this host's neighbors based on the order of the host UUIDs. Updates + * {@link #succHost} and {@link #predHost}. */ private void detmNeighbors() { if (assigned.size() < 2) { + logger.info("this host has no neighbors on topic {}", getTopic()); /* - * this host is the only one with any assignments - it has no - * neighbors + * this host is the only one with any assignments - it has no neighbors */ succHost = null; predHost = ""; @@ -91,11 +95,13 @@ public class ActiveState extends ProcessingState { if ((succHost = assigned.higher(getHost())) == null) { // wrapped around - successor is the first host in the set succHost = assigned.first(); + logger.info("this host's successor is {} on topic {}", succHost, getTopic()); } if ((predHost = assigned.lower(getHost())) == null) { // wrapped around - predecessor is the last host in the set predHost = assigned.last(); + logger.info("this host's predecessor is {} on topic {}", predHost, getTopic()); } } @@ -109,13 +115,14 @@ public class ActiveState extends ProcessingState { * Adds the timers. */ private void addTimers() { + logger.info("add timers"); /* * heart beat generator */ long genMs = getProperties().getActiveHeartbeatMs(); - scheduleWithFixedDelay(genMs, genMs, xxx -> { + scheduleWithFixedDelay(genMs, genMs, () -> { genHeartbeat(); return null; }); @@ -125,13 +132,14 @@ public class ActiveState extends ProcessingState { */ long interMs = getProperties().getInterHeartbeatMs(); - scheduleWithFixedDelay(interMs, interMs, xxx -> { + scheduleWithFixedDelay(interMs, interMs, () -> { if (myHeartbeatSeen) { myHeartbeatSeen = false; return null; } // missed my heart beat + logger.error("missed my heartbeat on topic {}", getTopic()); return internalTopicFailed(); }); @@ -141,13 +149,15 @@ public class ActiveState extends ProcessingState { */ if (!predHost.isEmpty()) { - scheduleWithFixedDelay(interMs, interMs, xxx -> { + scheduleWithFixedDelay(interMs, interMs, () -> { if (predHeartbeatSeen) { predHeartbeatSeen = false; return null; } // missed the predecessor's heart beat + logger.warn("missed predecessor's heartbeat on topic {}", getTopic()); + publish(makeQuery()); return goQuery(); @@ -172,70 +182,85 @@ public class ActiveState extends ProcessingState { String src = msg.getSource(); if (src == null) { + logger.warn("Heartbeat message has no source on topic {}", getTopic()); return null; } else if (src.equals(getHost())) { + logger.info("saw my heartbeat on topic {}", getTopic()); myHeartbeatSeen = true; } else if (src.equals(predHost)) { + logger.info("saw heartbeat from {} on topic {}", src, getTopic()); predHeartbeatSeen = true; - + + } else { + logger.info("ignored heartbeat message from {} on topic {}", src, getTopic()); } return null; } @Override + public State process(Leader msg) { + if (!isValid(msg)) { + return null; + } + + String src = msg.getSource(); + + if (getHost().compareTo(src) < 0) { + // our host would be a better leader - find out what's up + logger.warn("unexpected Leader message from {} on topic {}", src, getTopic()); + return goQuery(); + } + + logger.info("have a new leader {} on topic {}", src, getTopic()); + + return goActive(msg.getAssignments()); + } + + @Override public State process(Offline msg) { String src = msg.getSource(); if (src == null) { + logger.warn("Offline message has no source on topic {}", getTopic()); return null; } else if (!assigned.contains(src)) { /* - * the offline host wasn't assigned any buckets, so just ignore the - * message + * the offline host wasn't assigned any buckets, so just ignore the message */ + logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic()); return null; } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) { /* * Case 1: We are the leader. * - * Case 2: Our predecessor was the leader and it has gone offline - - * we should become the leader. + * Case 2: Our predecessor was the leader and it has gone offline - we should + * become the leader. * - * In either case, we are now the leader and we must re-balance the - * buckets since one of the hosts has gone offline. + * In either case, we are now the leader and we must re-balance the buckets + * since one of the hosts has gone offline. */ + logger.info("Offline message from source {} on topic {}", src, getTopic()); + assigned.remove(src); return becomeLeader(assigned); } else { /* - * Otherwise, we don't care right now - we'll wait for the leader to - * tell us it's been removed. + * Otherwise, we don't care right now - we'll wait for the leader to tell us + * it's been removed. */ + logger.info("ignore Offline message from source {} on topic {}", src, getTopic()); return null; } } - /** - * Transitions to the query state. - */ - @Override - public State process(Query msg) { - State next = super.process(msg); - if (next != null) { - return next; - } - - return goQuery(); - } - protected String getSuccHost() { return succHost; } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java index 27678360..4878c241 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java @@ -21,11 +21,6 @@ package org.onap.policy.drools.pooling.state; import org.onap.policy.drools.pooling.PoolingManager; -import org.onap.policy.drools.pooling.message.Heartbeat; -import org.onap.policy.drools.pooling.message.Identification; -import org.onap.policy.drools.pooling.message.Leader; -import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.pooling.message.Query; /** * Idle state, used when offline. @@ -35,51 +30,4 @@ public class IdleState extends State { public IdleState(PoolingManager mgr) { super(mgr); } - - @Override - public void stop() { - // do nothing - don't even send of "offline" message - } - - /** - * Discards the message. - */ - @Override - public State process(Heartbeat msg) { - return null; - } - - /** - * Discards the message. - */ - @Override - public State process(Identification msg) { - return null; - } - - /** - * Copies the assignments, but doesn't change states. - */ - @Override - public State process(Leader msg) { - super.process(msg); - return null; - } - - /** - * Discards the message. - */ - @Override - public State process(Offline msg) { - return null; - } - - /** - * Discards the message. - */ - @Override - public State process(Query msg) { - return null; - } - } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java index 1c8e4dcc..6be2fb84 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java @@ -21,13 +21,19 @@ package org.onap.policy.drools.pooling.state; import org.onap.policy.drools.pooling.PoolingManager; +import org.onap.policy.drools.pooling.message.Leader; +import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The inactive state. In this state, we just wait a bit and then try to - * re-activate. In the meantime, all messages are ignored. + * The inactive state. In this state, we just wait a bit and then try to re-activate. In + * the meantime, all messages are ignored. */ public class InactiveState extends State { + private static final Logger logger = LoggerFactory.getLogger(InactiveState.class); + /** * * @param mgr @@ -41,11 +47,36 @@ public class InactiveState extends State { super.start(); - schedule(getProperties().getReactivateMs(), xxx -> goStart()); + schedule(getProperties().getReactivateMs(), () -> goStart()); + } + + @Override + public State process(Leader msg) { + if(isValid(msg)) { + logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic()); + startDistributing(msg.getAssignments()); + + if(msg.getAssignments().hasAssignment(getHost())) { + logger.info("received Leader message on topic {}", getTopic()); + return goActive(); + } + } + + return null; + } + + /** + * Generates an Identification message and goes to the query state. + */ + @Override + public State process(Query msg) { + logger.info("received Query message on topic {}", getTopic()); + publish(makeIdentification()); + return goQuery(); } /** - * Remains in this state. + * Remains in this state, without resetting any timers. */ @Override protected State goInactive() { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java index 2f830c66..1e9bb581 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java @@ -30,15 +30,13 @@ import java.util.SortedSet; import java.util.TreeSet; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.BucketAssignments; -import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Any state in which events are being processed locally and forwarded, as - * appropriate. + * Any state in which events are being processed locally and forwarded, as appropriate. */ public class ProcessingState extends State { @@ -52,8 +50,8 @@ public class ProcessingState extends State { /** * * @param mgr - * @param leader current known leader, which need not be the same as the - * assignment leader. Never {@code null} + * @param leader current known leader, which need not be the same as the assignment + * leader. Never {@code null} * @throws IllegalArgumentException if an argument is invalid */ public ProcessingState(PoolingManager mgr, String leader) { @@ -76,21 +74,31 @@ public class ProcessingState extends State { } /** - * Generates an Identification message and returns {@code null}. + * Goes active with a new set of assignments. + * + * @param asgn new assignments + * @return the new state, either Active or Inactive, depending on whether or not this + * host has an assignment */ - @Override - public State process(Query msg) { - publish(makeIdentification()); - return goQuery(); + protected State goActive(BucketAssignments asgn) { + startDistributing(asgn); + + if (asgn.hasAssignment(getHost())) { + return goActive(); + + } else { + return goInactive(); + } } /** - * Makes an Identification message. - * - * @return a new message + * Generates an Identification message and goes to the query state. */ - protected Identification makeIdentification() { - return new Identification(getHost(), getAssignments()); + @Override + public State process(Query msg) { + logger.info("received Query message on topic {}", getTopic()); + publish(makeIdentification()); + return goQuery(); } /** @@ -132,18 +140,17 @@ public class ProcessingState extends State { } /** - * Becomes the leader. Publishes a Leader message and enters the - * {@link ActiveState}. + * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}. * * @param alive hosts that are known to be alive * * @return the new state */ protected State becomeLeader(SortedSet<String> alive) { - String leader = getHost(); + String newLeader = getHost(); - if (!leader.equals(alive.first())) { - throw new IllegalArgumentException(leader + " cannot replace " + alive.first()); + if (!newLeader.equals(alive.first())) { + throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first()); } Leader msg = makeLeader(alive); @@ -155,8 +162,8 @@ public class ProcessingState extends State { } /** - * Makes a leader message. Assumes "this" host is the leader, and thus - * appears as the first host in the set of hosts that are still alive. + * Makes a leader message. Assumes "this" host is the leader, and thus appears as the + * first host in the set of hosts that are still alive. * * @param alive hosts that are known to be alive * @@ -222,8 +229,8 @@ public class ProcessingState extends State { } /** - * Removes excess hosts from the set of available hosts. Assumes "this" host - * is the leader, and thus appears as the first host in the set. + * Removes excess hosts from the set of available hosts. Assumes "this" host is the + * leader, and thus appears as the first host in the set. * * @param maxHosts maximum number of hosts to be retained * @param avail available hosts @@ -231,9 +238,9 @@ public class ProcessingState extends State { private void removeExcessHosts(int maxHosts, SortedSet<String> avail) { while (avail.size() > maxHosts) { /* - * Don't remove this host, as it's the leader. Since the leader is - * always at the front of the sorted set, we'll just pick off hosts - * from the back of the set. + * Don't remove this host, as it's the leader. Since the leader is always at + * the front of the sorted set, we'll just pick off hosts from the back of the + * set. */ String host = avail.last(); avail.remove(host); @@ -243,15 +250,15 @@ public class ProcessingState extends State { } /** - * Adds bucket indices to {@link HostBucket} objects. Buckets that are - * unassigned or assigned to a host that does not appear within the map are - * re-assigned to a host that appears within the map. + * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or + * assigned to a host that does not appear within the map are re-assigned to a host + * that appears within the map. * * @param bucket2host bucket assignments * @param host2data maps a host name to its {@link HostBucket} */ private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) { - LinkedList<Integer> nullBuckets = new LinkedList<Integer>(); + LinkedList<Integer> nullBuckets = new LinkedList<>(); for (int x = 0; x < bucket2host.length; ++x) { String host = bucket2host[x]; @@ -274,10 +281,9 @@ public class ProcessingState extends State { } /** - * Assigns null buckets (i.e., those having no assignment) to available - * hosts. + * Assigns null buckets (i.e., those having no assignment) to available hosts. * - * @param buckets available hosts + * @param buckets buckets that still need to be assigned to hosts * @param coll collection of current host-bucket assignments */ private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) { @@ -295,9 +301,9 @@ public class ProcessingState extends State { } /** - * Re-balances the buckets, taking from those that have a larger count and - * giving to those that have a smaller count. Populates an output array with - * the new assignments. + * Re-balances the buckets, taking from those that have a larger count and giving to + * those that have a smaller count. Populates an output array with the new + * assignments. * * @param coll current bucket assignment * @param bucket2host array to be populated with the new assignments @@ -349,7 +355,7 @@ public class ProcessingState extends State { /** * Tracks buckets that have been assigned to a host. */ - public static class HostBucket implements Comparable<HostBucket> { + protected static class HostBucket implements Comparable<HostBucket> { /** * Host to which the buckets have been assigned. */ @@ -395,8 +401,8 @@ public class ProcessingState extends State { } /** - * Compares host buckets, first by the number of buckets, and then by - * the host name. + * Compares host buckets, first by the number of buckets, and then by the host + * name. */ @Override public final int compareTo(HostBucket other) { @@ -406,5 +412,15 @@ public class ProcessingState extends State { } return d; } + + @Override + public final int hashCode() { + throw new UnsupportedOperationException("HostBucket cannot be hashed"); + } + + @Override + public final boolean equals(Object obj) { + throw new UnsupportedOperationException("cannot compare HostBuckets"); + } } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java index 57521960..9045165b 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java @@ -26,26 +26,31 @@ import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Offline; - -// TODO add logging +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The Query state. In this state, the host waits for the other hosts to - * identify themselves. Eventually, a leader should come forth. If not, it will - * transition to the active or inactive state, depending on whether or not it - * has an assignment in the current bucket assignments. The other possibility is - * that it may <i>become</i> the leader, in which case it will also transition - * to the active state. + * The Query state. In this state, the host waits for the other hosts to identify + * themselves. Eventually, a leader should come forth. If not, it will transition to the + * active or inactive state, depending on whether or not it has an assignment in the + * current bucket assignments. The other possibility is that it may <i>become</i> the + * leader, in which case it will also transition to the active state. */ public class QueryState extends ProcessingState { + private static final Logger logger = LoggerFactory.getLogger(QueryState.class); + /** - * Hosts that have sent an "Identification" message. Always includes this - * host. + * Hosts that have sent an "Identification" message. Always includes this host. */ private TreeSet<String> alive = new TreeSet<>(); /** + * {@code True} if we saw our own Identification method, {@code false} otherwise. + */ + private boolean sawSelfIdent = false; + + /** * * @param mgr */ @@ -71,44 +76,42 @@ public class QueryState extends ProcessingState { private void awaitIdentification() { /* - * Once we've waited long enough for all Identification messages to - * arrive, become the leader, assuming we should. + * Once we've waited long enough for all Identification messages to arrive, become + * the leader, assuming we should. */ - schedule(getProperties().getIdentificationMs(), xxx -> { + schedule(getProperties().getIdentificationMs(), () -> { + + if (!sawSelfIdent) { + // didn't see our identification + logger.error("missed our own Ident message on topic {}", getTopic()); + return internalTopicFailed(); - if (isLeader()) { + } else if (isLeader()) { // "this" host is the new leader + logger.info("this host is the new leader for topic {}", getTopic()); return becomeLeader(alive); } else if (hasAssignment()) { /* - * this host is not the new leader, but it does have an - * assignment - return to the active state while we wait for the - * leader + * this host is not the new leader, but it does have an assignment - + * return to the active state while we wait for the leader */ + logger.info("no new leader on topic {}", getTopic()); return goActive(); } else { // not the leader and no assignment yet + logger.info("no new leader on topic {}", getTopic()); return goInactive(); } }); } /** - * Remains in this state. - */ - @Override - public State goQuery() { - return null; - } - - /** * Determines if this host has an assignment in the CURRENT assignments. * - * @return {@code true} if this host has an assignment, {@code false} - * otherwise + * @return {@code true} if this host has an assignment, {@code false} otherwise */ protected boolean hasAssignment() { BucketAssignments asgn = getAssignments(); @@ -116,53 +119,73 @@ public class QueryState extends ProcessingState { } @Override + public State goQuery() { + return null; + } + + @Override public State process(Identification msg) { - recordInfo(msg.getSource(), msg.getAssignments()); + if (getHost().equals(msg.getSource())) { + logger.info("saw our own Ident message on topic {}", getTopic()); + sawSelfIdent = true; + + } else { + logger.info("received Ident message from {} on topic {}", msg.getSource(), getTopic()); + recordInfo(msg.getSource(), msg.getAssignments()); + } return null; } /** - * If the message leader is better than the leader we have, then go active - * with it. Otherwise, simply treat it like an {@link Identification} - * message. + * If the message leader is better than the leader we have, then go active with it. + * Otherwise, simply treat it like an {@link Identification} message. */ @Override public State process(Leader msg) { - BucketAssignments asgn = msg.getAssignments(); - if (asgn == null) { + if (!isValid(msg)) { return null; } - // ignore Leader messages from ourself String source = msg.getSource(); - if (source == null || source.equals(getHost())) { - return null; - } - - // the new leader must equal the source - if (!source.equals(asgn.getLeader())) { - return null; - } + BucketAssignments asgn = msg.getAssignments(); - // go active, if this has a better leader than the one we have - if (source.compareTo(getLeader()) < 0) { - return super.process(msg); + // go active, if this has a leader that's the same or better than the one we have + if (source.compareTo(getLeader()) <= 0) { + logger.warn("leader with {} on topic {}", source, getTopic()); + return goActive(asgn); } /* - * The message does not have an acceptable leader, but we'll still - * record its info. + * The message does not have an acceptable leader, but we'll still record its + * info. */ - recordInfo(msg.getSource(), msg.getAssignments()); + logger.info("record leader info from {} on topic {}", source, getTopic()); + recordInfo(source, asgn); + + return null; + } + + @Override + public State process(Offline msg) { + String host = msg.getSource(); + + if (host != null && !host.equals(getHost())) { + logger.warn("host {} offline on topic {}", host, getTopic()); + alive.remove(host); + setLeader(alive.first()); + + } else { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); + } return null; } /** - * Records info from a message, adding the source host name to - * {@link #alive}, and updating the bucket assignments. + * Records info from a message, adding the source host name to {@link #alive}, and + * updating the bucket assignments. * * @param source the message's source host * @param assignments assignments, or {@code null} @@ -181,29 +204,18 @@ public class QueryState extends ProcessingState { // record assignments, if we don't have any yet BucketAssignments current = getAssignments(); if (current == null) { + logger.info("received initial assignments on topic {}", getTopic()); setAssignments(assignments); return; } /* - * Record assignments, if the new assignments have a better (i.e., - * lesser) leader. + * Record assignments, if the new assignments have a better (i.e., lesser) leader. */ String curldr = current.getLeader(); - if (curldr == null || curldr.compareTo(assignments.getLeader()) > 0) { + if (curldr == null || assignments.getLeader().compareTo(curldr) < 0) { + logger.info("use new assignments from {} on topic {}", source, getTopic()); setAssignments(assignments); } } - - @Override - public State process(Offline msg) { - String host = msg.getSource(); - - if (host != null && !host.equals(getHost())) { - alive.remove(host); - setLeader(alive.first()); - } - - return null; - } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java index decbdfda..a978e245 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java @@ -28,19 +28,19 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; import java.util.Map; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.Heartbeat; -import org.onap.policy.drools.pooling.message.Identification; -import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; -import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The start state. Upon entry, a heart beat is generated and the event filter - * is changed to look for just that particular message. Once the message is - * seen, it goes into the {@link QueryState}. + * The start state. Upon entry, a heart beat is generated and the event filter is changed + * to look for just that particular message. Once the message is seen, it goes into the + * {@link QueryState}. */ public class StartState extends State { + private static final Logger logger = LoggerFactory.getLogger(StartState.class); + /** * Time stamp inserted into the heart beat message. */ @@ -69,54 +69,28 @@ public class StartState extends State { publish(getHost(), makeHeartbeat(hbTimestampMs)); - schedule(getProperties().getStartHeartbeatMs(), xxx -> internalTopicFailed()); + schedule(getProperties().getStartHeartbeatMs(), () -> { + logger.error("missed heartbeat on topic {}", getTopic()); + return internalTopicFailed(); + }); } /** - * Transitions to the query state if the heart beat originated from this - * host and its time stamp matches. + * Transitions to the query state if the heart beat originated from this host and its + * time stamp matches. */ @Override public State process(Heartbeat msg) { if (msg.getTimestampMs() == hbTimestampMs && getHost().equals(msg.getSource())) { // saw our own heart beat - transition to query state + logger.info("saw our own heartbeat on topic {}", getTopic()); publish(makeQuery()); return goQuery(); - } - - return null; - } - - /** - * Discards the message. - */ - @Override - public State process(Identification msg) { - return null; - } - - /** - * Processes the assignments, but remains in the current state. - */ - @Override - public State process(Leader msg) { - super.process(msg); - return null; - } - /** - * Discards the message. - */ - @Override - public State process(Offline msg) { - return null; - } + } else { + logger.info("ignored old heartbeat message from {} on topic {}", msg.getSource(), getTopic()); + } - /** - * Discards the message. - */ - @Override - public State process(Query msg) { return null; } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index 1e3a907e..421b9225 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -26,7 +26,7 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledFuture; +import org.onap.policy.drools.pooling.CancellableScheduledTask; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.PoolingProperties; import org.onap.policy.drools.pooling.message.BucketAssignments; @@ -37,16 +37,19 @@ import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A state in the finite state machine. * <p> - * A state may have several timers associated with it, which must be cancelled - * whenever the state is changed. Assumes that timers are not continuously added - * to the same state. + * A state may have several timers associated with it, which must be cancelled whenever + * the state is changed. Assumes that timers are not continuously added to the same state. */ public abstract class State { + private static final Logger logger = LoggerFactory.getLogger(State.class); + /** * Host pool manager. */ @@ -55,7 +58,7 @@ public abstract class State { /** * Timers added by this state. */ - private final List<ScheduledFuture<?>> timers = new LinkedList<>(); + private final List<CancellableScheduledTask> timers = new LinkedList<>(); /** * @@ -66,9 +69,9 @@ public abstract class State { } /** - * Gets the server-side filter to use when polling the DMaaP internal topic. - * The default method returns a filter that accepts messages on the admin - * channel and on the host's own channel. + * Gets the server-side filter to use when polling the DMaaP internal topic. The + * default method returns a filter that accepts messages on the admin channel and on + * the host's own channel. * * @return the server-side filter to use. */ @@ -80,25 +83,15 @@ public abstract class State { /** * Cancels the timers added by this state. */ - public void cancelTimers() { - for (ScheduledFuture<?> fut : timers) { - fut.cancel(false); - } + public final void cancelTimers() { + timers.forEach(timer -> timer.cancel()); } /** - * Starts the state. + * Starts the state. The default method simply logs a message and returns. */ public void start() { - - } - - /** - * Indicates that the finite state machine is stopping. Sends an "offline" - * message to the other hosts. - */ - public void stop() { - publish(makeOffline()); + logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic()); } /** @@ -106,7 +99,7 @@ public abstract class State { * * @return the new state */ - public State goStart() { + public final State goStart() { return mgr.goStart(); } @@ -124,7 +117,7 @@ public abstract class State { * * @return the new state */ - public State goActive() { + public final State goActive() { return mgr.goActive(); } @@ -138,13 +131,14 @@ public abstract class State { } /** - * Processes a message. The default method passes it to the manager to - * handle and returns {@code null}. + * Processes a message. The default method passes it to the manager to handle and + * returns {@code null}. * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ public State process(Forward msg) { + logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); mgr.handle(msg); return null; } @@ -156,6 +150,7 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Heartbeat msg) { + logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic()); return null; } @@ -166,41 +161,54 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Identification msg) { + logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic()); return null; } /** - * Processes a message. If this host has a new assignment, then it - * transitions to the active state. Otherwise, it transitions to the - * inactive state. + * Processes a message. The default method copies the assignments and then returns + * {@code null}. * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ public State process(Leader msg) { + if (isValid(msg)) { + logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic()); + startDistributing(msg.getAssignments()); + } + + return null; + } + + /** + * Determines if a message is valid and did not originate from this host. + * + * @param msg message to be validated + * @return {@code true} if the message is valid, {@code false} otherwise + */ + protected boolean isValid(Leader msg) { BucketAssignments asgn = msg.getAssignments(); if (asgn == null) { - return null; + logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic()); + return false; } + // ignore Leader messages from ourself String source = msg.getSource(); - if (source == null) { - return null; + if (source == null || source.equals(getHost())) { + logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic()); + return false; } // the new leader must equal the source - if (source.equals(asgn.getLeader())) { - startDistributing(asgn); - - if (asgn.hasAssignment(getHost())) { - return goActive(); + boolean result = source.equals(asgn.getLeader()); - } else { - return goInactive(); - } + if (!result) { + logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic()); } - return null; + return result; } /** @@ -210,6 +218,7 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Offline msg) { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); return null; } @@ -220,6 +229,7 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Query msg) { + logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic()); return null; } @@ -228,7 +238,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Identification msg) { + protected final void publish(Identification msg) { mgr.publishAdmin(msg); } @@ -237,7 +247,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Leader msg) { + protected final void publish(Leader msg) { mgr.publishAdmin(msg); } @@ -246,7 +256,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Offline msg) { + protected final void publish(Offline msg) { mgr.publishAdmin(msg); } @@ -255,7 +265,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Query msg) { + protected final void publish(Query msg) { mgr.publishAdmin(msg); } @@ -265,7 +275,7 @@ public abstract class State { * @param channel * @param msg message to be published */ - protected void publish(String channel, Forward msg) { + protected final void publish(String channel, Forward msg) { mgr.publish(channel, msg); } @@ -275,7 +285,7 @@ public abstract class State { * @param channel * @param msg message to be published */ - protected void publish(String channel, Heartbeat msg) { + protected final void publish(String channel, Heartbeat msg) { mgr.publish(channel, msg); } @@ -284,7 +294,7 @@ public abstract class State { * * @param assignments */ - protected void startDistributing(BucketAssignments assignments) { + protected final void startDistributing(BucketAssignments assignments) { if (assignments != null) { mgr.startDistributing(assignments); } @@ -296,7 +306,7 @@ public abstract class State { * @param delayMs * @param task */ - protected void schedule(long delayMs, StateTimerTask task) { + protected final void schedule(long delayMs, StateTimerTask task) { timers.add(mgr.schedule(delayMs, task)); } @@ -307,7 +317,7 @@ public abstract class State { * @param delayMs * @param task */ - protected void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task)); } @@ -316,7 +326,7 @@ public abstract class State { * * @return a new {@link InactiveState} */ - protected State internalTopicFailed() { + protected final State internalTopicFailed() { publish(makeOffline()); mgr.internalTopicFailed(); @@ -330,16 +340,25 @@ public abstract class State { * * @return a new message */ - protected Heartbeat makeHeartbeat(long timestampMs) { + protected final Heartbeat makeHeartbeat(long timestampMs) { return new Heartbeat(getHost(), timestampMs); } /** + * Makes an Identification message. + * + * @return a new message + */ + protected Identification makeIdentification() { + return new Identification(getHost(), getAssignments()); + } + + /** * Makes an "offline" message. * * @return a new message */ - protected Offline makeOffline() { + protected final Offline makeOffline() { return new Offline(getHost()); } @@ -348,7 +367,7 @@ public abstract class State { * * @return a new message */ - protected Query makeQuery() { + protected final Query makeQuery() { return new Query(getHost()); } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java index bd388b4e..346d4496 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java @@ -29,9 +29,8 @@ public interface StateTimerTask { /** * Fires the timer. * - * @param arg always {@code null} * @return the new state, or {@code null} if the state is unchanged */ - public State fire(Void arg); + public State fire(); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java index f68f2395..a91671fd 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java @@ -21,6 +21,8 @@ package org.onap.policy.drools.pooling; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; @@ -211,11 +213,11 @@ public class DmaapManagerTest { expectException("startPublisher,start", xxx -> mgr.startPublisher()); expectException("startPublisher,publish", xxx -> mgr.publish(MSG)); - + // allow it to succeed this time reset(sink); when(sink.send(any())).thenReturn(true); - + mgr.startPublisher(); verify(sink).start(); @@ -227,29 +229,64 @@ public class DmaapManagerTest { @Test public void testStopPublisher() throws PoolingFeatureException { // not publishing yet, so stopping should have no effect - mgr.stopPublisher(); + mgr.stopPublisher(0); verify(sink, never()).stop(); - + // now start it mgr.startPublisher(); - + // this time, stop should do something - mgr.stopPublisher(); + mgr.stopPublisher(0); verify(sink).stop(); - + // re-stopping should have no effect - mgr.stopPublisher(); + mgr.stopPublisher(0); verify(sink).stop(); } @Test + public void testStopPublisher_WithDelay() throws PoolingFeatureException { + + mgr.startPublisher(); + + long tbeg = System.currentTimeMillis(); + + mgr.stopPublisher(100L); + + assertTrue(System.currentTimeMillis() >= tbeg + 100L); + } + + @Test + public void testStopPublisher_WithDelayInterrupted() throws Exception { + + mgr.startPublisher(); + + long minms = 2000L; + + // tell the publisher to stop in minms + additional time + Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L)); + thread.start(); + + // give the thread a chance to start + Thread.sleep(50L); + + // interrupt it - it should immediately finish its work + thread.interrupt(); + + // wait for it to stop, but only wait the minimum time + thread.join(minms); + + assertFalse(thread.isAlive()); + } + + @Test public void testStopPublisher_Exception() throws PoolingFeatureException { mgr.startPublisher(); - + // force exception when it stops doThrow(new IllegalStateException("expected")).when(sink).stop(); - mgr.stopPublisher(); + mgr.stopPublisher(0); } @Test @@ -270,14 +307,14 @@ public class DmaapManagerTest { // not consuming yet, so stopping should have no effect mgr.stopConsumer(listener); verify(source, never()).unregister(any()); - + // now start it mgr.startConsumer(listener); - + // this time, stop should do something mgr.stopConsumer(listener); verify(source).unregister(listener); - + // re-stopping should have no effect mgr.stopConsumer(listener); verify(source).unregister(listener); @@ -292,7 +329,7 @@ public class DmaapManagerTest { public void testSetFilter_Exception() throws PoolingFeatureException { // force an error when setFilter() is called doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any()); - + mgr.setFilter(FILTER); } @@ -300,41 +337,41 @@ public class DmaapManagerTest { public void testPublish() throws PoolingFeatureException { // cannot publish before starting expectException("publish,pre", xxx -> mgr.publish(MSG)); - + mgr.startPublisher(); - + // publish several messages mgr.publish(MSG); verify(sink).send(MSG); - - mgr.publish(MSG+"a"); - verify(sink).send(MSG+"a"); - - mgr.publish(MSG+"b"); - verify(sink).send(MSG+"b"); - + + mgr.publish(MSG + "a"); + verify(sink).send(MSG + "a"); + + mgr.publish(MSG + "b"); + verify(sink).send(MSG + "b"); + // stop and verify we can no longer publish - mgr.stopPublisher(); + mgr.stopPublisher(0); expectException("publish,stopped", xxx -> mgr.publish(MSG)); } @Test(expected = PoolingFeatureException.class) public void testPublish_SendFailed() throws PoolingFeatureException { mgr.startPublisher(); - + // arrange for send() to fail when(sink.send(MSG)).thenReturn(false); - + mgr.publish(MSG); } @Test(expected = PoolingFeatureException.class) public void testPublish_SendEx() throws PoolingFeatureException { mgr.startPublisher(); - + // arrange for send() to throw an exception doThrow(new IllegalStateException("expected")).when(sink).send(MSG); - + mgr.publish(MSG); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java deleted file mode 100644 index 5f918f73..00000000 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.generalize; -import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize; -import java.util.Properties; -import org.junit.Test; - -public class FeatureEnabledCheckerTest { - - private static final String PROP_NAME = "enable.{?.}it"; - - private static final String SPEC = "my.specializer"; - - @Test - public void test() { - assertFalse(check(null, null)); - assertTrue(check(null, true)); - assertFalse(check(null, false)); - - assertTrue(check(true, null)); - assertTrue(check(true, true)); - assertFalse(check(true, false)); - - assertFalse(check(false, null)); - assertTrue(check(false, true)); - assertFalse(check(false, false)); - } - - /** - * Adds properties, as specified, and checks if the feature is enabled. - * - * @param wantGen value to assign to the generalized property, or - * {@code null} to leave it unset - * @param wantSpec value to assign to the specialized property, or - * {@code null} to leave it unset - * @return {@code true} if the feature is enabled, {@code false} otherwise - */ - public boolean check(Boolean wantGen, Boolean wantSpec) { - Properties props = new Properties(); - - if (wantGen != null) { - props.setProperty(generalize(PROP_NAME), wantGen.toString()); - } - - if (wantSpec != null) { - props.setProperty(specialize(PROP_NAME, SPEC), wantSpec.toString()); - } - - return FeatureEnabledChecker.isFeatureEnabled(props, SPEC, PROP_NAME); - } - -} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java new file mode 100644 index 00000000..13d70f52 --- /dev/null +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java @@ -0,0 +1,1158 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.pooling; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.Arrays; +import java.util.Deque; +import java.util.IdentityHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.event.comm.FilterableTopicSource; +import org.onap.policy.drools.event.comm.Topic; +import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; +import org.onap.policy.drools.event.comm.TopicListener; +import org.onap.policy.drools.event.comm.TopicSink; +import org.onap.policy.drools.event.comm.TopicSource; +import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having + * its own feature object. + */ +public class FeatureTest { + + private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class); + + /** + * Name of the topic used for inter-host communication. + */ + private static final String INTERNAL_TOPIC = "my.internal.topic"; + + /** + * Name of the topic from which "external" events "arrive". + */ + private static final String EXTERNAL_TOPIC = "my.external.topic"; + + /** + * Name of the controller. + */ + private static final String CONTROLLER1 = "controller.one"; + + // private static final long STD_HEARTBEAT_WAIT_MS = 100; + // private static final long STD_REACTIVATE_WAIT_MS = 200; + // private static final long STD_IDENTIFICATION_MS = 60; + // private static final long STD_ACTIVE_HEARTBEAT_MS = 5; + // private static final long STD_INTER_HEARTBEAT_MS = 50; + // private static final long STD_OFFLINE_PUB_WAIT_MS = 2; + // private static final long POLL_MS = 2; + // private static final long INTER_POLL_MS = 2; + // private static final long EVENT_WAIT_SEC = 5; + + // use these to slow things down + private static final long STD_HEARTBEAT_WAIT_MS = 5000; + private static final long STD_REACTIVATE_WAIT_MS = 10000; + private static final long STD_IDENTIFICATION_MS = 10000; + private static final long STD_ACTIVE_HEARTBEAT_MS = 5000; + private static final long STD_INTER_HEARTBEAT_MS = 12000; + private static final long STD_OFFLINE_PUB_WAIT_MS = 2; + private static final long POLL_MS = 2; + private static final long INTER_POLL_MS = 2000; + private static final long EVENT_WAIT_SEC = 1000; + + // these are saved and restored on exit from this test class + private static PoolingFeature.Factory saveFeatureFactory; + private static PoolingManagerImpl.Factory saveManagerFactory; + private static DmaapManager.Factory saveDmaapFactory; + + /** + * Context for the current test case. + */ + private Context ctx; + + @BeforeClass + public static void setUpBeforeClass() { + saveFeatureFactory = PoolingFeature.getFactory(); + saveManagerFactory = PoolingManagerImpl.getFactory(); + saveDmaapFactory = DmaapManager.getFactory(); + } + + @AfterClass + public static void tearDownAfterClass() { + PoolingFeature.setFactory(saveFeatureFactory); + PoolingManagerImpl.setFactory(saveManagerFactory); + DmaapManager.setFactory(saveDmaapFactory); + } + + @Before + public void setUp() { + ctx = null; + } + + @After + public void tearDown() { + if (ctx != null) { + ctx.destroy(); + } + } + + @Ignore + @Test + public void test_SingleHost() throws Exception { + int nmessages = 70; + + ctx = new Context(nmessages); + + ctx.addHost(); + ctx.startHosts(); + + for (int x = 0; x < nmessages; ++x) { + ctx.offerExternal(makeMessage(x)); + } + + ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS); + + assertEquals(0, ctx.getDecodeErrors()); + assertEquals(0, ctx.getRemainingEvents()); + ctx.checkAllSawAMsg(); + } + + @Ignore + @Test + public void test_TwoHosts() throws Exception { + int nmessages = 200; + + ctx = new Context(nmessages); + + ctx.addHost(); + ctx.addHost(); + ctx.startHosts(); + + for (int x = 0; x < nmessages; ++x) { + ctx.offerExternal(makeMessage(x)); + } + + // wait for all hosts to have time to process a few messages + Thread.sleep(STD_ACTIVE_HEARTBEAT_MS + INTER_POLL_MS * 3); + + // pause a topic for a bit +// ctx.pauseTopic(); + + // now we'll see if it recovers + + ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS); + + assertEquals(0, ctx.getDecodeErrors()); + assertEquals(0, ctx.getRemainingEvents()); + ctx.checkAllSawAMsg(); + } + + private String makeMessage(int reqnum) { + return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}"; + } + + /** + * Context used for a single test case. + */ + private static class Context { + + private final FeatureFactory featureFactory; + private final ManagerFactory managerFactory; + private final DmaapFactory dmaapFactory; + + /** + * Hosts that have been added to this context. + */ + private final Deque<Host> hosts = new LinkedList<>(); + + /** + * Maps a drools controller to its policy controller. + */ + private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>(); + + /** + * Maps a channel to its queue. Does <i>not</i> include the "admin" channel. + */ + private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7); + + /** + * Queue for the external "DMaaP" topic. + */ + private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>(); + + /** + * Counts the number of decode errors. + */ + private final AtomicInteger nDecodeErrors = new AtomicInteger(0); + + /** + * Number of events we're still waiting to receive. + */ + private final CountDownLatch eventCounter; + + /** + * Maps host name to its topic source. This must be in sorted order so we can + * identify the source for the host with the higher name. + */ + private TreeMap<String, TopicSourceImpl> host2topic = new TreeMap<>(); + + /** + * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by + * {@link #getCurrentHost()}. + */ + private Host currentHost = null; + + /** + * + * @param nEvents number of events to be processed + */ + public Context(int nEvents) { + featureFactory = new FeatureFactory(this); + managerFactory = new ManagerFactory(this); + dmaapFactory = new DmaapFactory(this); + eventCounter = new CountDownLatch(nEvents); + + PoolingFeature.setFactory(featureFactory); + PoolingManagerImpl.setFactory(managerFactory); + DmaapManager.setFactory(dmaapFactory); + } + + /** + * Destroys the context, stopping any hosts that remain. + */ + public void destroy() { + stopHosts(); + hosts.clear(); + } + + /** + * Creates and adds a new host to the context. + */ + public void addHost() { + hosts.add(new Host(this)); + } + + /** + * Starts the hosts. + */ + public void startHosts() { + hosts.forEach(host -> host.start()); + } + + /** + * Stops the hosts. + */ + public void stopHosts() { + hosts.forEach(host -> host.stop()); + } + + /** + * Verifies that all hosts processed at least one message. + */ + public void checkAllSawAMsg() { + int x = 0; + for (Host host : hosts) { + assertTrue("x=" + x, host.messageSeen()); + ++x; + } + } + + /** + * Sets {@link #currentHost} to the specified host, and then invokes the given + * function. Resets {@link #currentHost} to {@code null} before returning. + * + * @param host + * @param func function to invoke + */ + public void withHost(Host host, VoidFunction func) { + currentHost = host; + func.apply(); + currentHost = null; + } + + /** + * Offers an event to the external topic. + * + * @param event + */ + public void offerExternal(String event) { + externalTopic.offer(event); + } + + /** + * Adds an internal channel to the set of channels. + * + * @param channel + * @param queue the channel's queue + */ + public void addInternal(String channel, BlockingQueue<String> queue) { + channel2queue.put(channel, queue); + } + + /** + * Offers a message to all internal channels. + * + * @param message + */ + public void offerInternal(String message) { + channel2queue.values().forEach(queue -> queue.offer(message)); + } + + /** + * Offers amessage to an internal channel. + * + * @param channel + * @param message + */ + public void offerInternal(String channel, String message) { + BlockingQueue<String> queue = channel2queue.get(channel); + if (queue != null) { + queue.offer(message); + } + } + + /** + * Decodes an event. + * + * @param event + * @return the decoded event, or {@code null} if it cannot be decoded + */ + public Object decodeEvent(String event) { + return managerFactory.decodeEvent(null, null, event); + } + + /** + * Associates a controller with its drools controller. + * + * @param controller + * @param droolsController + */ + public void addController(PolicyController controller, DroolsController droolsController) { + drools2policy.put(droolsController, controller); + } + + /** + * @param droolsController + * @return the controller associated with a drools controller, or {@code null} if + * it has no associated controller + */ + public PolicyController getController(DroolsController droolsController) { + return drools2policy.get(droolsController); + } + + /** + * @return queue for the external topic + */ + public BlockingQueue<String> getExternalTopic() { + return externalTopic; + } + + /** + * + * @return the number of decode errors so far + */ + public int getDecodeErrors() { + return nDecodeErrors.get(); + } + + /** + * Increments the count of decode errors. + */ + public void bumpDecodeErrors() { + nDecodeErrors.incrementAndGet(); + } + + /** + * + * @return the number of events that haven't been processed + */ + public long getRemainingEvents() { + return eventCounter.getCount(); + } + + /** + * Adds an event to the counter. + */ + public void addEvent() { + eventCounter.countDown(); + } + + /** + * Waits, for a period of time, for all events to be processed. + * + * @param time + * @param units + * @return {@code true} if all events have been processed, {@code false} otherwise + * @throws InterruptedException + */ + public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException { + return eventCounter.await(time, units); + } + + /** + * Associates a host with a topic. + * + * @param host + * @param topic + */ + public void addTopicSource(String host, TopicSourceImpl topic) { + host2topic.put(host, topic); + } + + /** + * Pauses the last topic source long enough to miss a heart beat. + */ + public void pauseTopic() { + Entry<String, TopicSourceImpl> ent = host2topic.lastEntry(); + if (ent != null) { + ent.getValue().pause(STD_ACTIVE_HEARTBEAT_MS); + } + } + + /** + * Gets the current host, provided this is used from within a call to + * {@link #withHost(Host, VoidFunction)}. + * + * @return the current host, or {@code null} if there is no current host + */ + public Host getCurrentHost() { + return currentHost; + } + } + + /** + * Simulates a single "host". + */ + private static class Host { + + private final Context context; + + private final PoolingFeature feature = new PoolingFeature(); + + /** + * {@code True} if this host has processed a message, {@code false} otherwise. + */ + private final AtomicBoolean sawMsg = new AtomicBoolean(false); + + /** + * This host's internal "DMaaP" topic. + */ + private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(); + + /** + * Source that reads from the external topic and posts to the listener. + */ + private TopicSource externalSource; + + // mock objects + private final PolicyEngine engine = mock(PolicyEngine.class); + private final ListenerController controller = mock(ListenerController.class); + private final DroolsController drools = mock(DroolsController.class); + + /** + * + * @param context + */ + public Host(Context context) { + this.context = context; + + when(controller.getName()).thenReturn(CONTROLLER1); + when(controller.getDrools()).thenReturn(drools); + + // stop consuming events if the controller stops + when(controller.stop()).thenAnswer(args -> { + externalSource.unregister(controller); + return true; + }); + + doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any()); + + context.addController(controller, drools); + + // arrange to read from the external topic + externalSource = new TopicSourceImpl(context, false); + } + + /** + * Gets the host name. This should only be invoked within {@link #start()}. + * + * @return the host name + */ + public String getName() { + return PoolingManagerImpl.getLastHost(); + } + + /** + * Starts threads for the host so that it begins consuming from both the external + * "DMaaP" topic and its own internal "DMaaP" topic. + */ + public void start() { + + context.withHost(this, () -> { + + feature.beforeStart(engine); + feature.afterCreate(controller); + + // assign the queue for this host's internal topic + context.addInternal(getName(), msgQueue); + + feature.beforeStart(controller); + + // start consuming events from the external topic + externalSource.register(controller); + + feature.afterStart(controller); + }); + } + + /** + * Stops the host's threads. + */ + public void stop() { + feature.beforeStop(controller); + externalSource.unregister(controller); + feature.afterStop(controller); + } + + /** + * Offers an event to the feature, before the policy controller handles it. + * + * @param protocol + * @param topic2 + * @param event + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { + return feature.beforeOffer(controller, protocol, topic2, event); + } + + /** + * Offers an event to the feature, after the policy controller handles it. + * + * @param protocol + * @param topic + * @param event + * @param success + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) { + + return feature.afterOffer(controller, protocol, topic, event, success); + } + + /** + * Offers an event to the feature, before the drools controller handles it. + * + * @param fact + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean beforeInsert(Object fact) { + return feature.beforeInsert(drools, fact); + } + + /** + * Offers an event to the feature, after the drools controller handles it. + * + * @param fact + * @param successInsert {@code true} if it was successfully inserted by the drools + * controller, {@code false} otherwise + * @return {@code true} if the event was handled, {@code false} otherwise + */ + public boolean afterInsert(Object fact, boolean successInsert) { + return feature.afterInsert(drools, fact, successInsert); + } + + /** + * Indicates that a message was seen for this host. + */ + public void sawMessage() { + sawMsg.set(true); + } + + /** + * + * @return {@code true} if a message was seen for this host, {@code false} + * otherwise + */ + public boolean messageSeen() { + return sawMsg.get(); + } + + /** + * @return the queue associated with this host's internal topic + */ + public BlockingQueue<String> getInternalQueue() { + return msgQueue; + } + } + + /** + * Listener for the external topic. Simulates the actions taken by + * <i>AggregatedPolicyController.onTopicEvent</i>. + */ + private static class MyExternalTopicListener implements Answer<Void> { + + private final Context context; + private final Host host; + + public MyExternalTopicListener(Context context, Host host) { + this.context = context; + this.host = host; + } + + @Override + public Void answer(InvocationOnMock args) throws Throwable { + int i = 0; + CommInfrastructure commType = args.getArgument(i++); + String topic = args.getArgument(i++); + String event = args.getArgument(i++); + + if (host.beforeOffer(commType, topic, event)) { + return null; + } + + boolean result; + Object fact = context.decodeEvent(event); + + if (fact == null) { + result = false; + context.bumpDecodeErrors(); + + } else { + result = true; + + if (!host.beforeInsert(fact)) { + // feature did not handle it so we handle it here + host.afterInsert(fact, result); + + host.sawMessage(); + context.addEvent(); + } + } + + host.afterOffer(commType, topic, event, result); + return null; + } + } + + /** + * Sink implementation that puts a message on the queue specified by the + * <i>channel</i> embedded within the message. If it's the "admin" channel, then the + * message is placed on all queues. + */ + private static class TopicSinkImpl extends TopicImpl implements TopicSink { + + private final Context context; + + /** + * Used to decode the messages so that the channel can be extracted. + */ + private final Serializer serializer = new Serializer(); + + /** + * + * @param context + */ + public TopicSinkImpl(Context context) { + this.context = context; + } + + @Override + public synchronized boolean send(String message) { + if (!isAlive()) { + return false; + } + + try { + Message msg = serializer.decodeMsg(message); + String channel = msg.getChannel(); + + if (Message.ADMIN.equals(channel)) { + // add to every queue + context.offerInternal(message); + + } else { + // add to a specific queue + context.offerInternal(channel, message); + } + + return true; + + } catch (IOException e) { + logger.warn("could not decode message: {}", message); + context.bumpDecodeErrors(); + return false; + } + } + } + + /** + * Source implementation that reads from a queue associated with a topic. + */ + private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource { + + private final String topic; + + /** + * Queue from which to retrieve messages. + */ + private final BlockingQueue<String> queue; + + /** + * Manages the current consumer thread. The "first" item is used as a trigger to + * tell the thread to stop processing, while the "second" item is triggered <i>by + * the thread</i> when it completes. + */ + private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null); + + /** + * Time, in milliseconds, to pause before polling for more messages. + */ + private AtomicLong pauseTimeMs = new AtomicLong(0); + + /** + * + * @param context + * @param internal {@code true} if to read from the internal topic, {@code false} + * to read from the external topic + */ + public TopicSourceImpl(Context context, boolean internal) { + if (internal) { + Host host = context.getCurrentHost(); + + this.topic = INTERNAL_TOPIC; + this.queue = host.getInternalQueue(); + + context.addTopicSource(host.getName(), this); + + } else { + this.topic = EXTERNAL_TOPIC; + this.queue = context.getExternalTopic(); + } + } + + @Override + public void setFilter(String filter) { + logger.info("topic filter set to: {}", filter); + } + + @Override + public String getTopic() { + return topic; + } + + @Override + public boolean offer(String event) { + throw new UnsupportedOperationException("offer topic source"); + } + + /** + * Starts a thread that takes messages from the queue and gives them to the + * listener. Stops the thread of any previously registered listener. + */ + @Override + public void register(TopicListener listener) { + Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1)); + + reregister(newPair); + + new Thread(() -> { + try { + do { + processMessages(newPair.first(), listener); + } while (!newPair.first().await(INTER_POLL_MS, TimeUnit.MILLISECONDS)); + + logger.info("topic source thread completed"); + + } catch (InterruptedException e) { + logger.warn("topic source thread aborted", e); + Thread.currentThread().interrupt(); + + } catch (RuntimeException e) { + logger.warn("topic source thread aborted", e); + } + + newPair.second().countDown(); + + }).start(); + } + + /** + * Stops the thread of <i>any</i> currently registered listener. + */ + @Override + public void unregister(TopicListener listener) { + reregister(null); + } + + /** + * Registers a new "pair" with this source, stopping the consumer associated with + * any previous registration. + * + * @param newPair the new "pair", or {@code null} to unregister + */ + private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) { + try { + Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair); + if (oldPair == null) { + if (newPair == null) { + // unregister was invoked twice in a row + logger.warn("re-unregister for topic source"); + } + + // no previous thread to stop + return; + } + + // need to stop the previous thread + + // tell it to stop + oldPair.first().countDown(); + + // wait for it to stop + if (!oldPair.second().await(2, TimeUnit.SECONDS)) { + logger.warn("old topic registration is still running"); + } + + } catch (InterruptedException e) { + logger.warn("old topic registration may still be running", e); + Thread.currentThread().interrupt(); + } + + if (newPair != null) { + // register was invoked twice in a row + logger.warn("re-register for topic source"); + } + } + + /** + * Indicates that {@link #processMessages(CountDownLatch, TopicListener)} should + * pause a bit. + * + * @param timeMs time, in milliseconds, to pause + */ + public void pause(long timeMs) { + pauseTimeMs.set(timeMs); + } + + /** + * Polls for messages from the topic and offers them to the listener. If + * {@link #pauseTimeMs} is non-zero, then it pauses for the specified time and + * then immediately returns. + * + * @param stopped triggered if processing should stop + * @param listener + * @throws InterruptedException + */ + private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException { + + for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) { + + long ptm = pauseTimeMs.getAndSet(0); + if (ptm != 0) { + logger.warn("pause processing"); + stopped.await(ptm, TimeUnit.MILLISECONDS); + return; + } + + String msg = queue.poll(POLL_MS, TimeUnit.MILLISECONDS); + if (msg == null) { + return; + } + + listener.onTopicEvent(CommInfrastructure.UEB, topic, msg); + } + } + } + + /** + * Topic implementation. Most methods just throw + * {@link UnsupportedOperationException}. + */ + private static class TopicImpl implements Topic { + + /** + * {@code True} if this topic is alive/running, {@code false} otherwise. + */ + private boolean alive = false; + + /** + * + */ + public TopicImpl() { + super(); + } + + @Override + public String getTopic() { + return INTERNAL_TOPIC; + } + + @Override + public CommInfrastructure getTopicCommInfrastructure() { + throw new UnsupportedOperationException("topic protocol"); + } + + @Override + public List<String> getServers() { + throw new UnsupportedOperationException("topic servers"); + } + + @Override + public String[] getRecentEvents() { + throw new UnsupportedOperationException("topic events"); + } + + @Override + public void register(TopicListener topicListener) { + throw new UnsupportedOperationException("register topic"); + } + + @Override + public void unregister(TopicListener topicListener) { + throw new UnsupportedOperationException("unregister topic"); + } + + @Override + public synchronized boolean start() { + if (alive) { + throw new IllegalStateException("topic already started"); + } + + alive = true; + return true; + } + + @Override + public synchronized boolean stop() { + if (!alive) { + throw new IllegalStateException("topic is not running"); + } + + alive = false; + return true; + } + + @Override + public synchronized void shutdown() { + alive = false; + } + + @Override + public synchronized boolean isAlive() { + return alive; + } + + @Override + public boolean lock() { + throw new UnsupportedOperationException("lock topicink"); + } + + @Override + public boolean unlock() { + throw new UnsupportedOperationException("unlock topic"); + } + + @Override + public boolean isLocked() { + throw new UnsupportedOperationException("topic isLocked"); + } + } + + /** + * Simulator for the feature-level factory. + */ + private static class FeatureFactory extends PoolingFeature.Factory { + + private final Context context; + + /** + * + * @param context + */ + public FeatureFactory(Context context) { + this.context = context; + + /* + * Note: do NOT extract anything from "context" at this point, because it + * hasn't been fully initialized yet + */ + } + + @Override + public Properties getProperties(String featName) { + Properties props = new Properties(); + + props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}"); + + props.setProperty("pooling." + CONTROLLER1 + ".topic", INTERNAL_TOPIC); + props.setProperty("pooling." + CONTROLLER1 + ".enabled", "true"); + props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.limit", "10000"); + props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.age.milliseconds", "1000000"); + props.setProperty("pooling." + CONTROLLER1 + ".start.heartbeat.milliseconds", "" + STD_HEARTBEAT_WAIT_MS); + props.setProperty("pooling." + CONTROLLER1 + ".reactivate.milliseconds", "" + STD_REACTIVATE_WAIT_MS); + props.setProperty("pooling." + CONTROLLER1 + ".identification.milliseconds", "" + STD_IDENTIFICATION_MS); + props.setProperty("pooling." + CONTROLLER1 + ".active.heartbeat.milliseconds", + "" + STD_ACTIVE_HEARTBEAT_MS); + props.setProperty("pooling." + CONTROLLER1 + ".inter.heartbeat.milliseconds", "" + STD_INTER_HEARTBEAT_MS); + props.setProperty("pooling." + CONTROLLER1 + ".offline.publish.wait.milliseconds", + "" + STD_OFFLINE_PUB_WAIT_MS); + + return props; + } + + @Override + public PolicyController getController(DroolsController droolsController) { + return context.getController(droolsController); + } + } + + /** + * Simulator for the pooling manager factory. + */ + private static class ManagerFactory extends PoolingManagerImpl.Factory { + + /** + * Used to decode events from the external topic. + */ + private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() { + @Override + protected ObjectMapper initialValue() { + return new ObjectMapper(); + } + }; + + /** + * Used to decode events into a Map. + */ + private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {}; + + /** + * + * @param context + */ + public ManagerFactory(Context context) { + + /* + * Note: do NOT extract anything from "context" at this point, because it + * hasn't been fully initialized yet + */ + } + + @Override + public boolean canDecodeEvent(DroolsController drools, String topic) { + return true; + } + + @Override + public Object decodeEvent(DroolsController drools, String topic, String event) { + try { + return mapper.get().readValue(event, typeRef); + + } catch (IOException e) { + logger.warn("cannot decode external event", e); + return null; + } + } + } + + /** + * Simulator for the dmaap manager factory. + */ + private static class DmaapFactory extends DmaapManager.Factory { + + private final Context context; + + /** + * + * @param context + */ + public DmaapFactory(Context context) { + this.context = context; + + /* + * Note: do NOT extract anything from "context" at this point, because it + * hasn't been fully initialized yet + */ + } + + @Override + public List<TopicSource> initTopicSources(Properties props) { + return Arrays.asList(new TopicSourceImpl(context, true)); + } + + @Override + public List<TopicSink> initTopicSinks(Properties props) { + return Arrays.asList(new TopicSinkImpl(context)); + } + } + + /** + * Controller that also implements the {@link TopicListener} interface. + */ + private static interface ListenerController extends PolicyController, TopicListener { + + } + + /** + * Simple function that takes no arguments and returns nothing. + */ + @FunctionalInterface + private static interface VoidFunction { + + public void apply(); + } +} diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java index 5b423d4b..34b604c9 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java @@ -31,12 +31,4 @@ public class PoolingFeatureExceptionTest extends ExceptionsTester { assertEquals(5, test(PoolingFeatureException.class)); } - @Test - public void testToRuntimeException() { - PoolingFeatureException plainExc = new PoolingFeatureException("hello"); - PoolingFeatureRtException runtimeExc = plainExc.toRuntimeException(); - - assertTrue(plainExc == runtimeExc.getCause()); - } - } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java index cd1aea09..7782e475 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java @@ -23,7 +23,7 @@ package org.onap.policy.drools.pooling; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -40,12 +41,11 @@ import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.pooling.PoolingFeature.Factory; import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyEngine; import org.onap.policy.drools.utils.Pair; public class PoolingFeatureTest { - private static final String CONFIG_DIR = "src/test/java/org/onap/policy/drools/pooling"; - private static final String CONTROLLER1 = "controllerA"; private static final String CONTROLLER2 = "controllerB"; private static final String CONTROLLER_DISABLED = "controllerDisabled"; @@ -66,6 +66,8 @@ public class PoolingFeatureTest { */ private static Factory saveFactory; + private Properties props; + private PolicyEngine engine; private PolicyController controller1; private PolicyController controller2; private PolicyController controllerDisabled; @@ -94,6 +96,8 @@ public class PoolingFeatureTest { @Before public void setUp() throws Exception { + props = initProperties(); + engine = mock(PolicyEngine.class); factory = mock(Factory.class); controller1 = mock(PolicyController.class); controller2 = mock(PolicyController.class); @@ -113,12 +117,13 @@ public class PoolingFeatureTest { when(controllerException.getName()).thenReturn(CONTROLLER_EX); when(controllerUnknown.getName()).thenReturn(CONTROLLER_UNKNOWN); + when(factory.getProperties(PoolingProperties.FEATURE_NAME)).thenReturn(props); when(factory.getController(drools1)).thenReturn(controller1); when(factory.getController(drools2)).thenReturn(controller2); when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled); when(factory.makeManager(any(), any())).thenAnswer(args -> { - PoolingProperties props = args.getArgumentAt(1, PoolingProperties.class); + PoolingProperties props = args.getArgument(1); PoolingManagerImpl mgr = mock(PoolingManagerImpl.class); @@ -129,7 +134,7 @@ public class PoolingFeatureTest { pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); pool.afterCreate(controller1); pool.afterCreate(controller2); @@ -149,24 +154,17 @@ public class PoolingFeatureTest { } @Test - public void testGlobalInit() { - pool = new PoolingFeature(); - - pool.globalInit(null, CONFIG_DIR); - } - - @Test(expected = PoolingFeatureRtException.class) - public void testGlobalInit_NotFound() { + public void testBeforeStartEngine() { pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR + "/unknown"); + assertFalse(pool.beforeStart(engine)); } @Test public void testAfterCreate() { managers.clear(); pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); assertFalse(pool.afterCreate(controller1)); assertEquals(1, managers.size()); @@ -184,7 +182,7 @@ public class PoolingFeatureTest { public void testAfterCreate_NotEnabled() { managers.clear(); pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); assertFalse(pool.afterCreate(controllerDisabled)); assertTrue(managers.isEmpty()); @@ -194,7 +192,7 @@ public class PoolingFeatureTest { public void testAfterCreate_PropertyEx() { managers.clear(); pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); pool.afterCreate(controllerException); } @@ -202,9 +200,9 @@ public class PoolingFeatureTest { @Test(expected = PoolingFeatureRtException.class) public void testAfterCreate_NoProps() { pool = new PoolingFeature(); - + // did not perform globalInit, which is an error - + pool.afterCreate(controller1); } @@ -212,7 +210,7 @@ public class PoolingFeatureTest { public void testAfterCreate_NoFeatProps() { managers.clear(); pool = new PoolingFeature(); - pool.globalInit(null, CONFIG_DIR); + pool.beforeStart(engine); assertFalse(pool.afterCreate(controllerUnknown)); assertTrue(managers.isEmpty()); @@ -492,4 +490,31 @@ public class PoolingFeatureTest { pool.afterStop(controller1); } + private Properties initProperties() { + Properties props = new Properties(); + + initProperties(props, "A", 0); + initProperties(props, "B", 1); + initProperties(props, "Exception", 2); + + props.setProperty("pooling.controllerDisabled.enabled", "false"); + + props.setProperty("pooling.controllerException.offline.queue.limit", "INVALID NUMBER"); + + return props; + } + + private void initProperties(Properties props, String suffix, int offset) { + props.setProperty("pooling.controller" + suffix + ".topic", "topic." + suffix); + props.setProperty("pooling.controller" + suffix + ".enabled", "true"); + props.setProperty("pooling.controller" + suffix + ".offline.queue.limit", String.valueOf(5 + offset)); + props.setProperty("pooling.controller" + suffix + ".offline.queue.age.milliseconds", + String.valueOf(100 + offset)); + props.setProperty("pooling.controller" + suffix + ".start.heartbeat.milliseconds", String.valueOf(10 + offset)); + props.setProperty("pooling.controller" + suffix + ".reactivate.milliseconds", String.valueOf(20 + offset)); + props.setProperty("pooling.controller" + suffix + ".identification.milliseconds", String.valueOf(30 + offset)); + props.setProperty("pooling.controller" + suffix + ".active.heartbeat.milliseconds", + String.valueOf(40 + offset)); + props.setProperty("pooling.controller" + suffix + ".inter.heartbeat.milliseconds", String.valueOf(50 + offset)); + } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index 01ee61ef..e32fa545 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -25,7 +25,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -40,7 +41,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -71,6 +71,7 @@ public class PoolingManagerImplTest { protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1; protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1; protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1; + protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1; private static final String HOST2 = "other.host"; @@ -85,8 +86,8 @@ public class PoolingManagerImplTest { private static final String REQUEST_ID = "my.request.id"; /** - * Number of dmaap.publish() invocations that should be issued when the - * manager is started. + * Number of dmaap.publish() invocations that should be issued when the manager is + * started. */ private static final int START_PUB = 1; @@ -135,6 +136,7 @@ public class PoolingManagerImplTest { when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS); when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS); when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS); + when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS); futures = new LinkedList<>(); ser = new Serializer(); @@ -180,11 +182,6 @@ public class PoolingManagerImplTest { mgr = new PoolingManagerImpl(controller, poolProps); } - @After - public void tearDown() throws Exception { - - } - @Test public void testPoolingManagerImpl() { mgr = new PoolingManagerImpl(controller, poolProps); @@ -199,8 +196,8 @@ public class PoolingManagerImplTest { @Test public void testPoolingManagerImpl_ClassEx() { /* - * this controller does not implement TopicListener, which should cause - * a ClassCastException + * this controller does not implement TopicListener, which should cause a + * ClassCastException */ PolicyController ctlr = mock(PolicyController.class); @@ -316,6 +313,7 @@ public class PoolingManagerImplTest { verify(dmaap).stopConsumer(mgr); verify(sched).shutdownNow(); + verify(dmaap).publish(contains("offline")); assertTrue(mgr.getCurrent() instanceof IdleState); } @@ -362,7 +360,7 @@ public class PoolingManagerImplTest { mgr.afterStop(); verify(eventQueue).clear(); - verify(dmaap).stopPublisher(); + verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); } @Test @@ -376,7 +374,7 @@ public class PoolingManagerImplTest { mgr.afterStop(); verify(eventQueue, never()).clear(); - verify(dmaap).stopPublisher(); + verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); } @Test @@ -511,7 +509,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -539,7 +537,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -608,8 +606,7 @@ public class PoolingManagerImplTest { StartState st = (StartState) mgr.getCurrent(); /* - * give it its heart beat, that should cause it to transition to the - * Query state. + * give it its heart beat, that should cause it to transition to the Query state. */ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs()); hb.setChannel(Message.ADMIN); @@ -985,14 +982,35 @@ public class PoolingManagerImplTest { } @Test + public void testInject_Ex() throws Exception { + startMgr(); + + // route the message to this host + mgr.startDistributing(makeAssignments(true)); + + // generate RuntimeException when onTopicEvent() is invoked + doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any()); + + CountDownLatch latch = catchRecursion(true); + + Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID); + mgr.handle(msg); + + verify(dmaap, times(START_PUB)).publish(any()); + verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT); + + // ensure we made it past both beforeXxx() methods + assertEquals(0, latch.getCount()); + } + + @Test public void testHandleInternal() throws Exception { startMgr(); StartState st = (StartState) mgr.getCurrent(); /* - * give it its heart beat, that should cause it to transition to the - * Query state. + * give it its heart beat, that should cause it to transition to the Query state. */ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs()); hb.setChannel(Message.ADMIN); @@ -1022,8 +1040,8 @@ public class PoolingManagerImplTest { Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs()); /* - * do NOT set the channel - this will cause the message to be invalid, - * triggering an exception + * do NOT set the channel - this will cause the message to be invalid, triggering + * an exception */ String msg = ser.encodeMsg(hb); @@ -1068,7 +1086,7 @@ public class PoolingManagerImplTest { when(eventQueue.poll()).thenAnswer(args -> lst.poll()); // route the messages to this host - mgr.startDistributing(makeAssignments(true)); + assertTrue(mgr.startDistributing(makeAssignments(true)).await(2, TimeUnit.SECONDS)); // all of the events should have been processed locally verify(dmaap, times(START_PUB)).publish(any()); @@ -1088,7 +1106,7 @@ public class PoolingManagerImplTest { when(eventQueue.poll()).thenAnswer(args -> lst.poll()); // route the messages to the OTHER host - mgr.startDistributing(makeAssignments(false)); + assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS)); // all of the events should have been forwarded verify(dmaap, times(4)).publish(any()); @@ -1140,7 +1158,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -1163,7 +1181,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -1208,15 +1226,14 @@ public class PoolingManagerImplTest { } /** - * Configure the mock controller to act like a real controller, invoking - * beforeOffer and then beforeInsert, so we can make sure they pass through. - * We'll keep count to ensure we don't get into infinite recursion. + * Configure the mock controller to act like a real controller, invoking beforeOffer + * and then beforeInsert, so we can make sure they pass through. We'll keep count to + * ensure we don't get into infinite recursion. * - * @param invokeBeforeInsert {@code true} if beforeInsert() should be - * invoked, {@code false} if it should be skipped + * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked, + * {@code false} if it should be skipped * - * @return a latch that will be counted down if both beforeXxx() methods - * return false + * @return a latch that will be counted down if both beforeXxx() methods return false */ private CountDownLatch catchRecursion(boolean invokeBeforeInsert) { CountDownLatch recursion = new CountDownLatch(3); @@ -1230,9 +1247,9 @@ public class PoolingManagerImplTest { } int iarg = 0; - CommInfrastructure proto = args.getArgumentAt(iarg++, CommInfrastructure.class); - String topic = args.getArgumentAt(iarg++, String.class); - String event = args.getArgumentAt(iarg++, String.class); + CommInfrastructure proto = args.getArgument(iarg++); + String topic = args.getArgument(iarg++); + String event = args.getArgument(iarg++); if (mgr.beforeOffer(proto, topic, event)) { return null; @@ -1253,9 +1270,8 @@ public class PoolingManagerImplTest { /** * Makes an assignment with two buckets. * - * @param sameHost {@code true} if the {@link #REQUEST_ID} should has to the - * manager's bucket, {@code false} if it should hash to the other - * host's bucket + * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the + * manager's bucket, {@code false} if it should hash to the other host's bucket * @return a new bucket assignment */ private BucketAssignments makeAssignments(boolean sameHost) { diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java index 63eb59d4..2d734c1c 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java @@ -30,6 +30,7 @@ import static org.onap.policy.drools.pooling.PoolingProperties.IDENTIFICATION_MS import static org.onap.policy.drools.pooling.PoolingProperties.INTER_HEARTBEAT_MS; import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_AGE_MS; import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_LIMIT; +import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_PUB_WAIT_MS; import static org.onap.policy.drools.pooling.PoolingProperties.POOLING_TOPIC; import static org.onap.policy.drools.pooling.PoolingProperties.REACTIVATE_MS; import static org.onap.policy.drools.pooling.PoolingProperties.START_HEARTBEAT_MS; @@ -53,6 +54,7 @@ public class PoolingPropertiesTest { public static final long STD_LEADER_MS = 5000L; public static final long STD_ACTIVE_HEARTBEAT_MS = 6000L; public static final long STD_INTER_HEARTBEAT_MS = 7000L; + public static final long STD_OFFLINE_PUB_WAIT_MS = 8000L; private Properties plain; private PoolingProperties pooling; @@ -121,10 +123,15 @@ public class PoolingPropertiesTest { doTest(INTER_HEARTBEAT_MS, STD_INTER_HEARTBEAT_MS, 15000L, xxx -> pooling.getInterHeartbeatMs()); } + @Test + public void testGetOfflinePubWaitMs() throws PropertyException { + doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs()); + } + /** - * Tests a particular property. Verifies that the correct value is returned - * if the specialized property has a value or the property has no value. - * Also verifies that the property name can be generalized. + * Tests a particular property. Verifies that the correct value is returned if the + * specialized property has a value or the property has no value. Also verifies that + * the property name can be generalized. * * @param propnm name of the property of interest * @param specValue expected specialized value @@ -140,8 +147,8 @@ public class PoolingPropertiesTest { assertEquals("special " + propnm, specValue, func.apply(null)); /* - * Ensure the property supports generalization - this will throw an - * exception if it does not. + * Ensure the property supports generalization - this will throw an exception if + * it does not. */ assertFalse(propnm.equals(generalize(propnm))); @@ -155,8 +162,8 @@ public class PoolingPropertiesTest { } /** - * Makes a set of properties, where all of the properties are specialized - * for the controller. + * Makes a set of properties, where all of the properties are specialized for the + * controller. * * @return a new property set */ @@ -172,6 +179,7 @@ public class PoolingPropertiesTest { props.setProperty(specialize(IDENTIFICATION_MS, CONTROLLER), "" + STD_IDENTIFICATION_MS); props.setProperty(specialize(ACTIVE_HEARTBEAT_MS, CONTROLLER), "" + STD_ACTIVE_HEARTBEAT_MS); props.setProperty(specialize(INTER_HEARTBEAT_MS, CONTROLLER), "" + STD_INTER_HEARTBEAT_MS); + props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS); return props; } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java index 8b495099..505dc400 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java @@ -49,8 +49,8 @@ public class SpecPropertiesTest { private static final String PREFIX_SPEC = PREFIX_GEN + MY_SPEC + "."; /** - * Suffix to add to property names to generate names of properties that are - * not populated. + * Suffix to add to property names to generate names of properties that are not + * populated. */ private static final String SUFFIX = ".suffix"; @@ -175,6 +175,16 @@ public class SpecPropertiesTest { assertNull(props.getProperty(gen(PROP_UNKNOWN), null)); } + @Test(expected = UnsupportedOperationException.class) + public void testHashCode() { + props.hashCode(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testEquals() { + props.equals(props); + } + private String gen(String propnm) { if (propnm.startsWith(PREFIX_SPEC)) { return PREFIX_GEN + propnm.substring(PREFIX_SPEC.length()); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java index ef03d4d6..c14e8dba 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; import org.junit.Test; @@ -196,16 +197,21 @@ public class BucketAssignmentsTest { String[] arr = {"def", "abc", "def", "ghi", "def", "def", "xyz"}; asgn.setHostArray(arr); - for (int x = 0; x < arr.length; ++x) { - assertEquals("x=" + x, arr[x], asgn.getAssignedHost(x)); + /* + * get assignments for consecutive integers, including negative numbers and + * numbers extending past the length of the array. + * + */ + TreeSet<String> seen = new TreeSet<>(); + for (int x = -1; x < arr.length + 2; ++x) { + seen.add(asgn.getAssignedHost(x)); } - // negative - assertNull(asgn.getAssignedHost(-1)); + TreeSet<String> expected = new TreeSet<>(Arrays.asList(arr)); + assertEquals(expected, seen); - // beyond end - assertNull(asgn.getAssignedHost(arr.length)); - assertNull(asgn.getAssignedHost(arr.length + 1)); + // try a much bigger number + assertNotNull(asgn.getAssignedHost(arr.length * 1000)); } @Test diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java index 7997a4ee..7b4b0602 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java @@ -24,9 +24,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -39,11 +39,12 @@ import org.junit.Before; import org.junit.Test; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Heartbeat; -import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; +import org.onap.policy.drools.utils.Pair; +import org.onap.policy.drools.utils.Triple; public class ActiveStateTest extends BasicStateTester { @@ -65,7 +66,7 @@ public class ActiveStateTest extends BasicStateTester { // ensure a heart beat was generated Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(MY_HOST, msg.second().getSource()); } @Test @@ -183,8 +184,8 @@ public class ActiveStateTest extends BasicStateTester { /* * - * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader - * thus should be ignored. + * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader thus + * should be ignored. */ assertNull(state.process(new Offline(PREV_HOST2))); } @@ -196,22 +197,57 @@ public class ActiveStateTest extends BasicStateTester { state = new ActiveState(mgr); /* - * HOST1 has buckets, but it isn't the leader and it isn't my - * predecessor, thus should be ignored. + * HOST1 has buckets, but it isn't the leader and it isn't my predecessor, thus + * should be ignored. */ assertNull(state.process(new Offline(HOST1))); } @Test - public void testProcessQuery() { + public void testProcessLeader_Invalid() { + Leader msg = new Leader(PREV_HOST, null); + + // should stay in the same state, and not start distributing + assertNull(state.process(msg)); + verify(mgr, never()).startDistributing(any()); + verify(mgr, never()).goActive(); + verify(mgr, never()).goInactive(); + + // info should be unchanged + assertEquals(MY_HOST, state.getLeader()); + assertEquals(ASGN3, state.getAssignments()); + } + + @Test + public void testProcessLeader_BadLeader() { + String[] arr = {HOST2, HOST1}; + BucketAssignments asgn = new BucketAssignments(arr); + + // now send a Leader message for that leader + Leader msg = new Leader(HOST1, asgn); + State next = mock(State.class); when(mgr.goQuery()).thenReturn(next); - assertEquals(next, state.process(new Query())); + // should go Query, but not start distributing + assertEquals(next, state.process(msg)); + verify(mgr, never()).startDistributing(asgn); + } + + @Test + public void testProcessLeader_GoodLeader() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + + // now send a Leader message for that leader + Leader msg = new Leader(PREV_HOST, asgn); + + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); - Identification ident = captureAdminMessage(Identification.class); - assertEquals(MY_HOST, ident.getSource()); - assertEquals(ASGN3, ident.getAssignments()); + // should go Active and start distributing + assertEquals(next, state.process(msg)); + verify(mgr).startDistributing(asgn); } @Test @@ -256,24 +292,24 @@ public class ActiveStateTest extends BasicStateTester { // invoke start() to add the timers state.start(); - assertEquals(3, repeatedFutures.size()); + assertEquals(3, repeatedSchedules.size()); Triple<Long, Long, StateTimerTask> timer; // heart beat generator timer = repeatedTasks.remove(); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue()); // my heart beat checker timer = repeatedTasks.remove(); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); // predecessor's heart beat checker timer = repeatedTasks.remove(); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); } @Test @@ -285,19 +321,19 @@ public class ActiveStateTest extends BasicStateTester { // invoke start() to add the timers state.start(); - assertEquals(2, repeatedFutures.size()); + assertEquals(2, repeatedSchedules.size()); Triple<Long, Long, StateTimerTask> timer; // heart beat generator timer = repeatedTasks.remove(); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue()); // my heart beat checker timer = repeatedTasks.remove(); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue()); - assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue()); + assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue()); } @Test @@ -314,14 +350,14 @@ public class ActiveStateTest extends BasicStateTester { verify(mgr).publish(anyString(), any(Heartbeat.class)); // fire the task - assertNull(task.third.fire(null)); + assertNull(task.third().fire()); // should have generated a second pair of heart beats verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class)); Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.first); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(MY_HOST, msg.first()); + assertEquals(MY_HOST, msg.second().getSource()); } @Test @@ -339,7 +375,7 @@ public class ActiveStateTest extends BasicStateTester { when(mgr.goInactive()).thenReturn(next); // fire the task - should not transition - assertNull(task.third.fire(null)); + assertNull(task.third().fire()); verify(mgr, never()).publishAdmin(any(Query.class)); } @@ -356,7 +392,7 @@ public class ActiveStateTest extends BasicStateTester { when(mgr.goInactive()).thenReturn(next); // fire the task - should transition - assertEquals(next, task.third.fire(null)); + assertEquals(next, task.third().fire()); // should indicate failure verify(mgr).internalTopicFailed(); @@ -381,7 +417,7 @@ public class ActiveStateTest extends BasicStateTester { when(mgr.goQuery()).thenReturn(next); // fire the task - should NOT transition - assertNull(task.third.fire(null)); + assertNull(task.third().fire()); verify(mgr, never()).publishAdmin(any(Query.class)); } @@ -398,7 +434,7 @@ public class ActiveStateTest extends BasicStateTester { when(mgr.goQuery()).thenReturn(next); // fire the task - should transition - assertEquals(next, task.third.fire(null)); + assertEquals(next, task.third().fire()); verify(mgr).publishAdmin(any(Query.class)); } @@ -414,8 +450,8 @@ public class ActiveStateTest extends BasicStateTester { verify(mgr, times(1)).publish(any(), any()); Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.first); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(MY_HOST, msg.first()); + assertEquals(MY_HOST, msg.second().getSource()); } @Test @@ -429,13 +465,13 @@ public class ActiveStateTest extends BasicStateTester { // this message should go to itself msg = capturePublishedMessage(Heartbeat.class, index++); - assertEquals(MY_HOST, msg.first); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(MY_HOST, msg.first()); + assertEquals(MY_HOST, msg.second().getSource()); // this message should go to its successor msg = capturePublishedMessage(Heartbeat.class, index++); - assertEquals(HOST1, msg.first); - assertEquals(MY_HOST, msg.second.getSource()); + assertEquals(HOST1, msg.first()); + assertEquals(MY_HOST, msg.second().getSource()); } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java index e48742f7..75ca7564 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java @@ -21,9 +21,9 @@ package org.onap.policy.drools.pooling.state; import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,16 +33,18 @@ import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicReference; +import org.onap.policy.drools.pooling.CancellableScheduledTask; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.PoolingProperties; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.utils.Pair; +import org.onap.policy.drools.utils.Triple; /** - * Superclass used to test subclasses of {@link Message}. + * Superclass used to test subclasses of {@link State}. */ public class BasicStateTester { @@ -74,9 +76,9 @@ public class BasicStateTester { protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3); /** - * Futures returned by schedule(). + * Scheduled tasks returned by schedule(). */ - protected LinkedList<ScheduledFuture<?>> onceFutures; + protected LinkedList<CancellableScheduledTask> onceSchedules; /** * Tasks captured via schedule(). @@ -84,9 +86,9 @@ public class BasicStateTester { protected LinkedList<Pair<Long, StateTimerTask>> onceTasks; /** - * Futures returned by scheduleWithFixedDelay(). + * Scheduled tasks returned by scheduleWithFixedDelay(). */ - protected LinkedList<ScheduledFuture<?>> repeatedFutures; + protected LinkedList<CancellableScheduledTask> repeatedSchedules; /** * Tasks captured via scheduleWithFixedDelay(). @@ -112,10 +114,10 @@ public class BasicStateTester { } public void setUp() throws Exception { - onceFutures = new LinkedList<>(); + onceSchedules = new LinkedList<>(); onceTasks = new LinkedList<>(); - repeatedFutures = new LinkedList<>(); + repeatedSchedules = new LinkedList<>(); repeatedTasks = new LinkedList<>(); published = new LinkedList<>(); @@ -162,9 +164,9 @@ public class BasicStateTester { Object[] args = invocation.getArguments(); onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1])); - ScheduledFuture<?> fut = mock(ScheduledFuture.class); - onceFutures.add(fut); - return fut; + CancellableScheduledTask sched = mock(CancellableScheduledTask.class); + onceSchedules.add(sched); + return sched; }); // capture scheduleWithFixedDelay() arguments, and return a new future @@ -172,9 +174,9 @@ public class BasicStateTester { Object[] args = invocation.getArguments(); repeatedTasks.add(new Triple<>((Long) args[0], (Long) args[1], (StateTimerTask) args[2])); - ScheduledFuture<?> fut = mock(ScheduledFuture.class); - repeatedFutures.add(fut); - return fut; + CancellableScheduledTask sched = mock(CancellableScheduledTask.class); + repeatedSchedules.add(sched); + return sched; }); // get/set assignments in the manager @@ -183,7 +185,7 @@ public class BasicStateTester { when(mgr.getAssignments()).thenAnswer(args -> asgn.get()); doAnswer(args -> { - asgn.set(args.getArgumentAt(0, BucketAssignments.class)); + asgn.set(args.getArgument(0)); return null; }).when(mgr).startDistributing(any()); } @@ -199,8 +201,7 @@ public class BasicStateTester { } /** - * Captures the host array from the Leader message published to the admin - * channel. + * Captures the host array from the Leader message published to the admin channel. * * @return the host array, as a list */ @@ -209,8 +210,7 @@ public class BasicStateTester { } /** - * Captures the host array from the Leader message published to the admin - * channel. + * Captures the host array from the Leader message published to the admin channel. * * @return the host array */ @@ -224,8 +224,7 @@ public class BasicStateTester { } /** - * Captures the assignments from the Leader message published to the admin - * channel. + * Captures the assignments from the Leader message published to the admin channel. * * @return the bucket assignments */ @@ -277,42 +276,6 @@ public class BasicStateTester { */ protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) { Pair<String, Message> msg = published.get(index); - return new Pair<>(msg.first, clazz.cast(msg.second)); + return new Pair<>(msg.first(), clazz.cast(msg.second())); } - - /** - * Pair of values. - * - * @param <F> first value's type - * @param <S> second value's type - */ - public static class Pair<F, S> { - public final F first; - public final S second; - - public Pair(F first, S second) { - this.first = first; - this.second = second; - } - } - - /** - * Pair of values. - * - * @param <F> first value's type - * @param <S> second value's type - * @param <T> third value's type - */ - public static class Triple<F, S, T> { - public final F first; - public final S second; - public final T third; - - public Triple(F first, S second, T third) { - this.first = first; - this.second = second; - this.third = third; - } - } - } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java index 96c59719..95cbe753 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java @@ -61,12 +61,6 @@ public class IdleStateTest extends BasicStateTester { } @Test - public void testStop() { - state.stop(); - verifyNothingPublished(); - } - - @Test public void testProcessForward() { Forward msg = new Forward(); assertNull(state.process(msg)); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java index 48d5b1ed..394adaee 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java @@ -28,6 +28,7 @@ import java.util.Map; import org.junit.Before; import org.junit.Test; import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.utils.Pair; public class InactiveStateTest extends BasicStateTester { @@ -62,20 +63,20 @@ public class InactiveStateTest extends BasicStateTester { Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_REACTIVATE_WAIT_MS, timer.first.longValue()); + assertEquals(STD_REACTIVATE_WAIT_MS, timer.first().longValue()); // invoke the task - it should go to the state returned by the mgr State next = mock(State.class); when(mgr.goStart()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); } @Test public void testInactiveState() { /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java index d60ad2ea..e1718418 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -38,16 +38,19 @@ import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Query; +import org.onap.policy.drools.pooling.state.ProcessingState.HostBucket; public class ProcessingStateTest extends BasicStateTester { private ProcessingState state; + private HostBucket hostBucket; @Before public void setUp() throws Exception { super.setUp(); state = new ProcessingState(mgr, MY_HOST); + hostBucket = new HostBucket(MY_HOST); } @Test @@ -62,6 +65,39 @@ public class ProcessingStateTest extends BasicStateTester { } @Test + public void testGoActive_WithAssignment() { + State act = mock(State.class); + State inact = mock(State.class); + + when(mgr.goActive()).thenReturn(act); + when(mgr.goInactive()).thenReturn(inact); + + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + + assertEquals(act, state.goActive(asgn)); + + verify(mgr).startDistributing(asgn); + } + + @Test + public void testGoActive_WithoutAssignment() { + State act = mock(State.class); + State inact = mock(State.class); + + when(mgr.goActive()).thenReturn(act); + when(mgr.goInactive()).thenReturn(inact); + + String[] arr = {HOST2, PREV_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + + assertEquals(inact, state.goActive(asgn)); + + verify(mgr).startDistributing(asgn); + + } + + @Test public void testProcessQuery() { State next = mock(State.class); when(mgr.goQuery()).thenReturn(next); @@ -97,8 +133,8 @@ public class ProcessingStateTest extends BasicStateTester { state = new ProcessingState(mgr, LEADER); /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); @@ -260,8 +296,8 @@ public class ProcessingStateTest extends BasicStateTester { @Test public void testMakeBucketArray() { /* - * All hosts are still alive, so it should have the exact same - * assignments as it had to start. + * All hosts are still alive, so it should have the exact same assignments as it + * had to start. */ state.setAssignments(ASGN3); state.becomeLeader(sortHosts(HOST_ARR3)); @@ -325,4 +361,80 @@ public class ProcessingStateTest extends BasicStateTester { assertEquals(Arrays.asList(expected), captureHostList()); } + @Test + public void testHostBucketRemove_testHostBucketAdd_testHostBucketSize() { + assertEquals(0, hostBucket.size()); + + hostBucket.add(20); + hostBucket.add(30); + hostBucket.add(40); + assertEquals(3, hostBucket.size()); + + assertEquals(20, hostBucket.remove().intValue()); + assertEquals(30, hostBucket.remove().intValue()); + assertEquals(1, hostBucket.size()); + + // add more before taking the last item + hostBucket.add(50); + hostBucket.add(60); + assertEquals(3, hostBucket.size()); + + assertEquals(40, hostBucket.remove().intValue()); + assertEquals(50, hostBucket.remove().intValue()); + assertEquals(60, hostBucket.remove().intValue()); + assertEquals(0, hostBucket.size()); + + // add more AFTER taking the last item + hostBucket.add(70); + assertEquals(70, hostBucket.remove().intValue()); + assertEquals(0, hostBucket.size()); + } + + @Test + public void testHostBucketCompareTo() { + HostBucket hb1 = new HostBucket(PREV_HOST); + HostBucket hb2 = new HostBucket(MY_HOST); + + assertEquals(0, hb1.compareTo(hb1)); + assertEquals(0, hb1.compareTo(new HostBucket(PREV_HOST))); + + // both empty + assertTrue(hb1.compareTo(hb2) < 0); + assertTrue(hb2.compareTo(hb1) > 0); + + // hb1 has one bucket, so it should not be larger + hb1.add(100); + assertTrue(hb1.compareTo(hb2) > 0); + assertTrue(hb2.compareTo(hb1) < 0); + + // hb1 has two buckets, so it should still be larger + hb1.add(200); + assertTrue(hb1.compareTo(hb2) > 0); + assertTrue(hb2.compareTo(hb1) < 0); + + // hb1 has two buckets, hb2 has one, so hb1 should still be larger + hb2.add(1000); + assertTrue(hb1.compareTo(hb2) > 0); + assertTrue(hb2.compareTo(hb1) < 0); + + // same number of buckets, so hb2 should now be larger + hb2.add(2000); + assertTrue(hb1.compareTo(hb2) < 0); + assertTrue(hb2.compareTo(hb1) > 0); + + // hb2 has more buckets, it should still be larger + hb2.add(3000); + assertTrue(hb1.compareTo(hb2) < 0); + assertTrue(hb2.compareTo(hb1) > 0); + } + + @Test(expected = UnsupportedOperationException.class) + public void testHostBucketHashCode() { + hostBucket.hashCode(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testHostBucketEquals() { + hostBucket.equals(hostBucket); + } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java index d714d5cc..a7c3a3d5 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java @@ -25,9 +25,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Map; @@ -38,7 +39,7 @@ import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.pooling.message.Query; +import org.onap.policy.drools.utils.Pair; public class QueryStateTest extends BasicStateTester { @@ -63,44 +64,48 @@ public class QueryStateTest extends BasicStateTester { } @Test + public void testGoQuery() { + assertNull(state.goQuery()); + } + + @Test public void testStart() { state.start(); Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); - assertNotNull(timer.second); + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); } @Test - public void testGoQuery() { - assertNull(state.process(new Query())); - assertEquals(ASGN3, state.getAssignments()); - } + public void testProcessIdentification_SameSource() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); - @Test - public void testProcessIdentification_NullSource() { - assertNull(state.process(new Identification())); + assertNull(state.process(new Identification(MY_HOST, asgn))); + // info should be unchanged assertEquals(MY_HOST, state.getLeader()); + verify(mgr, never()).startDistributing(asgn); } @Test - public void testProcessIdentification_NewLeader() { - assertNull(state.process(new Identification(PREV_HOST, null))); - - assertEquals(PREV_HOST, state.getLeader()); - } + public void testProcessIdentification_DiffSource() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); - @Test - public void testProcessIdentification_NotNewLeader() { - assertNull(state.process(new Identification(HOST2, null))); + assertNull(state.process(new Identification(HOST2, asgn))); + // leader should be unchanged assertEquals(MY_HOST, state.getLeader()); + + // should have picked up the assignments + verify(mgr).startDistributing(asgn); } @Test - public void testProcessLeader_NullAssignment() { + public void testProcessLeader_Invalid() { Leader msg = new Leader(PREV_HOST, null); // should stay in the same state, and not start distributing @@ -115,67 +120,55 @@ public class QueryStateTest extends BasicStateTester { } @Test - public void testProcessLeader_NullSource() { + public void testProcessLeader_SameLeader() { String[] arr = {HOST2, PREV_HOST, MY_HOST}; BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(null, asgn); - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); - verify(mgr, never()).goInactive(); + // identify a leader that's better than my host + assertEquals(null, state.process(new Identification(PREV_HOST, asgn))); - // info should be unchanged - assertEquals(MY_HOST, state.getLeader()); - assertEquals(ASGN3, state.getAssignments()); - } + // now send a Leader message for that leader + Leader msg = new Leader(PREV_HOST, asgn); - @Test - public void testProcessLeader_SourceIsNotAssignmentLeader() { - String[] arr = {HOST2, PREV_HOST, MY_HOST}; - BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(HOST2, asgn); + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); + // should go Active and start distributing + assertEquals(next, state.process(msg)); verify(mgr, never()).goInactive(); - // info should be unchanged - assertEquals(MY_HOST, state.getLeader()); - assertEquals(ASGN3, state.getAssignments()); + // Ident msg + Leader msg = times(2) + verify(mgr, times(2)).startDistributing(asgn); } @Test - public void testProcessLeader_EmptyAssignment() { - Leader msg = new Leader(PREV_HOST, new BucketAssignments()); + public void testProcessLeader_BetterLeaderWithAssignment() { + String[] arr = {HOST2, PREV_HOST, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(PREV_HOST, asgn); - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); - verify(mgr, never()).goInactive(); + State next = mock(State.class); + when(mgr.goActive()).thenReturn(next); - // info should be unchanged - assertEquals(MY_HOST, state.getLeader()); - assertEquals(ASGN3, state.getAssignments()); + // should go Active and start distributing + assertEquals(next, state.process(msg)); + verify(mgr).startDistributing(asgn); + verify(mgr, never()).goInactive(); } @Test - public void testProcessLeader_BetterLeader() { - String[] arr = {HOST2, PREV_HOST, MY_HOST}; + public void testProcessLeader_BetterLeaderWithoutAssignment() { + String[] arr = {HOST2, PREV_HOST, HOST1}; BucketAssignments asgn = new BucketAssignments(arr); Leader msg = new Leader(PREV_HOST, asgn); State next = mock(State.class); - when(mgr.goActive()).thenReturn(next); + when(mgr.goInactive()).thenReturn(next); - // should go Active and start distributing + // should go Inactive, but start distributing assertEquals(next, state.process(msg)); verify(mgr).startDistributing(asgn); - verify(mgr, never()).goInactive(); + verify(mgr, never()).goActive(); } @Test @@ -241,41 +234,48 @@ public class QueryStateTest extends BasicStateTester { } @Test - public void testProcessQuery() { - BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2}); - mgr.startDistributing(asgn); - state = new QueryState(mgr); - - State next = mock(State.class); - when(mgr.goQuery()).thenReturn(next); - - assertEquals(null, state.process(new Query())); - - verify(mgr).publishAdmin(any(Identification.class)); - } - - @Test public void testQueryState() { /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } @Test + public void testAwaitIdentification_MissingSelfIdent() { + state.start(); + + Pair<Long, StateTimerTask> timer = onceTasks.remove(); + + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); + + // should published an Offline message and go inactive + + State next = mock(State.class); + when(mgr.goInactive()).thenReturn(next); + + assertEquals(next, timer.second().fire()); + + Offline msg = captureAdminMessage(Offline.class); + assertEquals(MY_HOST, msg.getSource()); + } + + @Test public void testAwaitIdentification_Leader() { state.start(); + state.process(new Identification(MY_HOST, null)); Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); - assertNotNull(timer.second); + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); State next = mock(State.class); when(mgr.goActive()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); // should have published a Leader message Leader msg = captureAdminMessage(Leader.class); @@ -291,20 +291,21 @@ public class QueryStateTest extends BasicStateTester { state = new QueryState(mgr); state.start(); + state.process(new Identification(MY_HOST, null)); // tell it the leader is still active state.process(new Identification(PREV_HOST, asgn)); Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); - assertNotNull(timer.second); + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); // set up active state, as that's what it should return State next = mock(State.class); when(mgr.goActive()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); // should NOT have published a Leader message assertTrue(admin.isEmpty()); @@ -321,20 +322,21 @@ public class QueryStateTest extends BasicStateTester { state = new QueryState(mgr); state.start(); + state.process(new Identification(MY_HOST, null)); // tell it the leader is still active state.process(new Identification(PREV_HOST, asgn)); Pair<Long, StateTimerTask> timer = onceTasks.remove(); - assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue()); - assertNotNull(timer.second); + assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue()); + assertNotNull(timer.second()); // set up inactive state, as that's what it should return State next = mock(State.class); when(mgr.goInactive()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); // should NOT have published a Leader message assertTrue(admin.isEmpty()); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java index f29d2348..af4e8f13 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java @@ -23,7 +23,7 @@ package org.onap.policy.drools.pooling.state; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -38,6 +38,7 @@ import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; +import org.onap.policy.drools.utils.Pair; public class StartStateTest extends BasicStateTester { @@ -74,18 +75,18 @@ public class StartStateTest extends BasicStateTester { Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class); - assertEquals(MY_HOST, msg.first); - assertEquals(state.getHbTimestampMs(), msg.second.getTimestampMs()); + assertEquals(MY_HOST, msg.first()); + assertEquals(state.getHbTimestampMs(), msg.second().getTimestampMs()); Pair<Long, StateTimerTask> timer = onceTasks.removeFirst(); - assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first.longValue()); + assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first().longValue()); // invoke the task - it should go to the state returned by the mgr State next = mock(State.class); when(mgr.goInactive()).thenReturn(next); - assertEquals(next, timer.second.fire(null)); + assertEquals(next, timer.second().fire()); verify(mgr).internalTopicFailed(); } @@ -93,8 +94,8 @@ public class StartStateTest extends BasicStateTester { @Test public void testStartStatePoolingManager() { /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } @@ -105,8 +106,8 @@ public class StartStateTest extends BasicStateTester { state = new StartState(mgr); /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java index 1be48e21..a184dfad 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java @@ -21,16 +21,18 @@ package org.onap.policy.drools.pooling.state; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.any; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Map; -import java.util.concurrent.ScheduledFuture; import org.junit.Before; import org.junit.Test; +import org.onap.policy.drools.pooling.CancellableScheduledTask; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Forward; @@ -55,8 +57,8 @@ public class StateTest extends BasicStateTester { @Test public void testStatePoolingManager() { /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } @@ -67,8 +69,8 @@ public class StateTest extends BasicStateTester { state = new MyState(mgr); /* - * Prove the state is attached to the manager by invoking getHost(), - * which delegates to the manager. + * Prove the state is attached to the manager by invoking getHost(), which + * delegates to the manager. */ assertEquals(MY_HOST, state.getHost()); } @@ -98,13 +100,13 @@ public class StateTest extends BasicStateTester { verify(mgr).schedule(delay, task2); verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3); - ScheduledFuture<?> fut1 = onceFutures.removeFirst(); - ScheduledFuture<?> fut2 = onceFutures.removeFirst(); - ScheduledFuture<?> fut3 = repeatedFutures.removeFirst(); + CancellableScheduledTask sched1 = onceSchedules.removeFirst(); + CancellableScheduledTask sched2 = onceSchedules.removeFirst(); + CancellableScheduledTask sched3 = repeatedSchedules.removeFirst(); - verify(fut1, never()).cancel(false); - verify(fut2, never()).cancel(false); - verify(fut3, never()).cancel(false); + verify(sched1, never()).cancel(); + verify(sched2, never()).cancel(); + verify(sched3, never()).cancel(); /* * Cancel the timers. @@ -112,9 +114,9 @@ public class StateTest extends BasicStateTester { state.cancelTimers(); // verify that all were cancelled - verify(fut1).cancel(false); - verify(fut2).cancel(false); - verify(fut3).cancel(false); + verify(sched1).cancel(); + verify(sched2).cancel(); + verify(sched3).cancel(); } @Test @@ -134,13 +136,6 @@ public class StateTest extends BasicStateTester { } @Test - public void testStop() { - state.stop(); - - assertEquals(MY_HOST, captureAdminMessage(Offline.class).getSource()); - } - - @Test public void testGoStart() { State next = mock(State.class); when(mgr.goStart()).thenReturn(next); @@ -195,7 +190,18 @@ public class StateTest extends BasicStateTester { } @Test - public void testProcessLeader_NullAssignment() { + public void testProcessLeader() { + String[] arr = {HOST2, HOST1}; + BucketAssignments asgn = new BucketAssignments(arr); + Leader msg = new Leader(HOST1, asgn); + + // should ignore it + assertEquals(null, state.process(msg)); + verify(mgr).startDistributing(asgn); + } + + @Test + public void testProcessLeader_Invalid() { Leader msg = new Leader(PREV_HOST, null); // should stay in the same state, and not start distributing @@ -206,57 +212,44 @@ public class StateTest extends BasicStateTester { } @Test - public void testProcessLeader_NullSource() { + public void testIsValidLeader_NullAssignment() { + assertFalse(state.isValid(new Leader(PREV_HOST, null))); + } + + @Test + public void testIsValidLeader_NullSource() { String[] arr = {HOST2, PREV_HOST, MY_HOST}; BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(null, asgn); + assertFalse(state.isValid(new Leader(null, asgn))); + } - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); - verify(mgr, never()).goInactive(); + @Test + public void testIsValidLeader_EmptyAssignment() { + assertFalse(state.isValid(new Leader(PREV_HOST, new BucketAssignments()))); } @Test - public void testProcessLeader_EmptyAssignment() { - Leader msg = new Leader(PREV_HOST, new BucketAssignments()); + public void testIsValidLeader_FromSelf() { + String[] arr = {HOST2, MY_HOST}; + BucketAssignments asgn = new BucketAssignments(arr); - // should stay in the same state, and not start distributing - assertNull(state.process(msg)); - verify(mgr, never()).startDistributing(any()); - verify(mgr, never()).goActive(); - verify(mgr, never()).goInactive(); + assertFalse(state.isValid(new Leader(MY_HOST, asgn))); } @Test - public void testProcessLeader_MyHostAssigned() { - String[] arr = {HOST2, PREV_HOST, MY_HOST}; + public void testIsValidLeader_WrongLeader() { + String[] arr = {HOST2, HOST3}; BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(PREV_HOST, asgn); - - State next = mock(State.class); - when(mgr.goActive()).thenReturn(next); - // should go Active and start distributing - assertEquals(next, state.process(msg)); - verify(mgr).startDistributing(asgn); - verify(mgr, never()).goInactive(); + assertFalse(state.isValid(new Leader(HOST1, asgn))); } @Test - public void testProcessLeader_MyHostUnassigned() { + public void testIsValidLeader() { String[] arr = {HOST2, HOST1}; BucketAssignments asgn = new BucketAssignments(arr); - Leader msg = new Leader(HOST1, asgn); - State next = mock(State.class); - when(mgr.goInactive()).thenReturn(next); - - // should go Inactive and start distributing - assertEquals(next, state.process(msg)); - verify(mgr).startDistributing(asgn); - verify(mgr, never()).goActive(); + assertTrue(state.isValid(new Leader(HOST1, asgn))); } @Test @@ -344,18 +337,18 @@ public class StateTest extends BasicStateTester { state.schedule(delay, task); - ScheduledFuture<?> fut = onceFutures.removeFirst(); + CancellableScheduledTask sched = onceSchedules.removeFirst(); // scheduled, but not canceled yet verify(mgr).schedule(delay, task); - verify(fut, never()).cancel(false); + verify(sched, never()).cancel(); /* - * Ensure the state added the timer to its list by telling it to cancel - * its timers and then seeing if this timer was canceled. + * Ensure the state added the timer to its list by telling it to cancel its timers + * and then seeing if this timer was canceled. */ state.cancelTimers(); - verify(fut).cancel(false); + verify(sched).cancel(); } @Test @@ -367,18 +360,18 @@ public class StateTest extends BasicStateTester { state.scheduleWithFixedDelay(initdel, delay, task); - ScheduledFuture<?> fut = repeatedFutures.removeFirst(); + CancellableScheduledTask sched = repeatedSchedules.removeFirst(); // scheduled, but not canceled yet verify(mgr).scheduleWithFixedDelay(initdel, delay, task); - verify(fut, never()).cancel(false); + verify(sched, never()).cancel(); /* - * Ensure the state added the timer to its list by telling it to cancel - * its timers and then seeing if this timer was canceled. + * Ensure the state added the timer to its list by telling it to cancel its timers + * and then seeing if this timer was canceled. */ state.cancelTimers(); - verify(fut).cancel(false); + verify(sched).cancel(); } @Test |