summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src')
-rw-r--r--feature-pooling-dmaap/src/assembly/assemble_zip.xml76
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java (renamed from feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java)30
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java44
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java144
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java98
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java9
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java32
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java291
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java27
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java34
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java19
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java1
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java2
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java45
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java1
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java95
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java52
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java39
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java96
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java142
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java60
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java127
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java3
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java93
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java74
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java1158
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java8
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java65
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java92
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java22
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java14
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties13
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java20
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java114
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java85
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java6
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java9
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java122
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java168
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java19
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java125
41 files changed, 2506 insertions, 1168 deletions
diff --git a/feature-pooling-dmaap/src/assembly/assemble_zip.xml b/feature-pooling-dmaap/src/assembly/assemble_zip.xml
new file mode 100644
index 00000000..9908a2b9
--- /dev/null
+++ b/feature-pooling-dmaap/src/assembly/assemble_zip.xml
@@ -0,0 +1,76 @@
+<!--
+ ============LICENSE_START=======================================================
+ feature-pooling-dmaap
+ ================================================================================
+ Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+ -->
+
+<!-- Defines how we build the .zip file which is our distribution. -->
+
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>feature-pooling-dmaap</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+
+ <!-- we want "system" and related files right at the root level as this
+ file is suppose to be unzip on top of a karaf distro. -->
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <fileSets>
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory>lib/feature</outputDirectory>
+ <includes>
+ <include>feature-pooling-dmaap-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>target/assembly/lib</directory>
+ <outputDirectory>lib/dependencies</outputDirectory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/config</directory>
+ <outputDirectory>config</outputDirectory>
+ <fileMode>0644</fileMode>
+ <excludes/>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0744</fileMode>
+ <excludes/>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/db</directory>
+ <outputDirectory>db</outputDirectory>
+ <fileMode>0744</fileMode>
+ <excludes/>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/install</directory>
+ <outputDirectory>install</outputDirectory>
+ <fileMode>0744</fileMode>
+ <excludes/>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java
index 428b5853..b16f44ad 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/Trial.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/CancellableScheduledTask.java
@@ -1,5 +1,6 @@
/*
* ============LICENSE_START=======================================================
+ * ONAP
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
@@ -17,25 +18,16 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.drools.pooling.message;
+package org.onap.policy.drools.pooling;
-import org.junit.Test;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class Trial {
-
- @Test
- public void test() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
-
- Message msg = new Forward("me", CommInfrastructure.DMAAP, "my topic", "a message", "my req");
-
- String enc = mapper.writeValueAsString(msg);
- System.out.println("enc=" + enc);
-
- Message msg2 = mapper.readValue(enc, Message.class);
- System.out.println("class=" + msg2.getClass());
- }
+/**
+ * A scheduled task that can be cancelled.
+ */
+@FunctionalInterface
+public interface CancellableScheduledTask {
+ /**
+ * Cancels the scheduled task.
+ */
+ public void cancel();
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 98543f29..102eda75 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -58,14 +58,14 @@ public class DmaapManager {
private final TopicSink topicSink;
/**
- * Topic sources. In theory, there's only one item in this list, the
- * internal DMaaP topic.
+ * Topic sources. In theory, there's only one item in this list, the internal DMaaP
+ * topic.
*/
private final List<TopicSource> sources;
/**
- * Topic sinks. In theory, there's only one item in this list, the internal
- * DMaaP topic.
+ * Topic sinks. In theory, there's only one item in this list, the internal DMaaP
+ * topic.
*/
private final List<TopicSink> sinks;
@@ -112,8 +112,8 @@ public class DmaapManager {
}
/**
- * Used by junit tests to set the factory used to create various objects
- * used by this class.
+ * Used by junit tests to set the factory used to create various objects used by this
+ * class.
*
* @param factory the new factory
*/
@@ -129,8 +129,7 @@ public class DmaapManager {
* Finds the topic source associated with the internal DMaaP topic.
*
* @return the topic source
- * @throws PoolingFeatureException if the source doesn't exist or is not
- * filterable
+ * @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
for (TopicSource src : sources) {
@@ -174,6 +173,7 @@ public class DmaapManager {
}
try {
+ logger.info("start publishing to topic {}", topic);
topicSink.start();
publishing = true;
@@ -184,13 +184,28 @@ public class DmaapManager {
/**
* Stops the publisher.
+ *
+ * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued
+ * messages and close
*/
- public void stopPublisher() {
+ public void stopPublisher(long waitMs) {
if (!publishing) {
return;
}
+ /*
+ * Give the sink a chance to transmit messages in the queue. It would be better if
+ * "waitMs" could be passed to sink.stop(), but that isn't an option at this time.
+ */
+ try {
+ Thread.sleep(waitMs);
+
+ } catch (InterruptedException e) {
+ logger.warn("message transmission stopped due to {}", e.getMessage());
+ }
+
try {
+ logger.info("stop publishing to topic {}", topic);
publishing = false;
topicSink.stop();
@@ -209,6 +224,7 @@ public class DmaapManager {
return;
}
+ logger.info("start consuming from topic {}", topic);
topicSource.register(listener);
consuming = true;
}
@@ -223,6 +239,7 @@ public class DmaapManager {
return;
}
+ logger.info("stop consuming from topic {}", topic);
consuming = false;
topicSource.unregister(listener);
}
@@ -230,16 +247,16 @@ public class DmaapManager {
/**
* Sets the server-side filter to be used by the consumer.
*
- * @param filter the filter string, or {@code null} if no filter is to be
- * used
+ * @param filter the filter string, or {@code null} if no filter is to be used
* @throws PoolingFeatureException if the topic is not filterable
*/
public void setFilter(String filter) throws PoolingFeatureException {
try {
+ logger.debug("change filter for topic {} to {}", topic, filter);
topicSource.setFilter(filter);
} catch (UnsupportedOperationException e) {
- throw new PoolingFeatureException("cannot filter topic " + topic);
+ throw new PoolingFeatureException("cannot filter topic " + topic, e);
}
}
@@ -247,8 +264,7 @@ public class DmaapManager {
* Publishes a message to the sink.
*
* @param msg message to be published
- * @throws PoolingFeatureException if an error occurs or the publisher isn't
- * running
+ * @throws PoolingFeatureException if an error occurs or the publisher isn't running
*/
public void publish(String msg) throws PoolingFeatureException {
if (!publishing) {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java
deleted file mode 100644
index d2f32043..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/FeatureEnabledChecker.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling;
-
-// TODO move to policy-utils
-
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.util.Properties;
-import org.onap.policy.common.utils.properties.SpecPropertyConfiguration;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-
-/**
- * Checks whether or not a feature is enabled. The name of the "enable" property
- * is assumed to be of the form accepted by a {@link SpecPropertyConfiguration},
- * which contains a substitution place-holder into which a "specializer" (e.g.,
- * controller or session name) is substituted.
- */
-public class FeatureEnabledChecker {
-
- /**
- *
- */
- private FeatureEnabledChecker() {
- super();
- }
-
- /**
- * Determines if a feature is enabled for a particular specializer.
- *
- * @param props properties from which to extract the "enabled" flag
- * @param specializer specializer to be substituted into the property name
- * when extracting
- * @param propName the name of the "enabled" property
- * @return {@code true} if the feature is enabled, or {@code false} if it is
- * not enabled (or if the property doesn't exist)
- * @throws IllegalArgumentException if the "enabled" property is not a
- * boolean value
- */
- public static boolean isFeatureEnabled(Properties props, String specializer, String propName) {
-
- try {
- return new Config(specializer, props, propName).isEnabled();
-
- } catch (PropertyException e) {
- throw new IllegalArgumentException("cannot check property " + propName, e);
- }
- }
-
-
- /**
- * Configuration used to extract the value.
- */
- private static class Config extends SpecPropertyConfiguration {
-
- /**
- * There is a bit of trickery here. This annotation is just a
- * place-holder to get the superclass to invoke the
- * {@link #setValue(java.lang.reflect.Field, Properties, Property)
- * setValue()} method. When that's invoked, we'll substitute
- * {@link #propOverride} instead of this annotation.
- */
- @Property(name = "feature-enabled-property-place-holder")
- private boolean enabled;
-
- /**
- * Annotation that will actually be used to set the field.
- */
- private Property propOverride;
-
- /**
- *
- * @param specializer specializer to be substituted into the property
- * name when extracting
- * @param props properties from which to extract the "enabled" flag
- * @param propName the name of the "enabled" property
- * @throws PropertyException if an error occurs
- */
- public Config(String specializer, Properties props, String propName) throws PropertyException {
- super(specializer);
-
- propOverride = new Property() {
-
- @Override
- public String name() {
- return propName;
- }
-
- @Override
- public String defaultValue() {
- // feature is disabled by default
- return "false";
- }
-
- @Override
- public String accept() {
- return "";
- }
-
- @Override
- public Class<? extends Annotation> annotationType() {
- return Property.class;
- }
- };
-
- setAllFields(props);
- }
-
- /**
- * Substitutes {@link #propOverride} for "prop".
- */
- @Override
- protected boolean setValue(Field field, Properties props, Property prop) throws PropertyException {
- return super.setValue(field, props, propOverride);
- }
-
- /**
- *
- * @return {@code true} if the feature is enabled, {@code false}
- * otherwise
- */
- public boolean isEnabled() {
- return enabled;
- }
- };
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index da47a031..21cbc4db 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -20,44 +20,41 @@
package org.onap.policy.drools.pooling;
-import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.core.PolicySessionFeatureAPI;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
+import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
+import org.onap.policy.drools.persistence.SystemPersistence;
import org.onap.policy.drools.system.PolicyController;
-import org.onap.policy.drools.utils.PropertyUtil;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.util.FeatureEnabledChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Controller/session pooling. Multiple hosts may be launched, all servicing the
- * same controllers/sessions. When this feature is enabled, the requests are
- * divided across the different hosts, instead of all running on a single,
- * active host.
+ * Controller/session pooling. Multiple hosts may be launched, all servicing the same
+ * controllers/sessions. When this feature is enabled, the requests are divided across the
+ * different hosts, instead of all running on a single, active host.
* <p>
- * With each controller, there is an associated DMaaP topic that is used for
- * internal communication between the different hosts serving the controller.
+ * With each controller, there is an associated DMaaP topic that is used for internal
+ * communication between the different hosts serving the controller.
*/
-public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControllerFeatureAPI, PolicySessionFeatureAPI {
+public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerFeatureAPI, DroolsControllerFeatureAPI {
private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
- // TODO state-management doesn't allow more than one active host at a time
-
/**
* Factory used to create objects.
*/
private static Factory factory;
/**
- * Entire set of feature properties, including those specific to various
- * controllers.
+ * Entire set of feature properties, including those specific to various controllers.
*/
private Properties featProps = null;
@@ -67,9 +64,9 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl
private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
/**
- * Arguments passed to beforeOffer(), which are saved for when the
- * beforeInsert() is called later. As multiple threads can be active within
- * the methods at the same time, we must keep this in thread local storage.
+ * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
+ * called later. As multiple threads can be active within the methods at the same
+ * time, we must keep this in thread local storage.
*/
private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
@@ -98,20 +95,11 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl
return 0;
}
- /**
- * @throws PoolingFeatureRtException if the properties cannot be read or are
- * invalid
- */
@Override
- public void globalInit(String[] args, String configDir) {
- logger.info("initializing pooling feature");
-
- try {
- featProps = PropertyUtil.getProperties(configDir + "/feature-pooling-dmaap.properties");
-
- } catch (IOException ex) {
- throw new PoolingFeatureRtException(ex);
- }
+ public boolean beforeStart(PolicyEngine engine) {
+ logger.info("initializing " + PoolingProperties.FEATURE_NAME);
+ featProps = factory.getProperties(PoolingProperties.FEATURE_NAME);
+ return false;
}
/**
@@ -205,8 +193,8 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl
@Override
public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
/*
- * As this is invoked a lot, we'll directly call the manager's method
- * instead of using the functional interface via doManager().
+ * As this is invoked a lot, we'll directly call the manager's method instead of
+ * using the functional interface via doManager().
*/
PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
if (mgr == null) {
@@ -226,6 +214,7 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl
OfferArgs args = offerArgs.get();
if (args == null) {
+ logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
return false;
}
@@ -234,16 +223,21 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl
controller = factory.getController(droolsController);
} catch (IllegalArgumentException | IllegalStateException e) {
+ logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
+ droolsController.getArtifactId(), e);
return false;
}
+
if (controller == null) {
+ logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
+ droolsController.getArtifactId());
return false;
}
/*
- * As this is invoked a lot, we'll directly call the manager's method
- * instead of using the functional interface via doManager().
+ * As this is invoked a lot, we'll directly call the manager's method instead of
+ * using the functional interface via doManager().
*/
PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
if (mgr == null) {
@@ -264,14 +258,12 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl
}
/**
- * Executes a function using the manager associated with the controller.
- * Catches any exceptions from the function and re-throws it as a runtime
- * exception.
+ * Executes a function using the manager associated with the controller. Catches any
+ * exceptions from the function and re-throws it as a runtime exception.
*
* @param controller
* @param func function to be executed
- * @return {@code true} if the function handled the request, {@code false}
- * otherwise
+ * @return {@code true} if the function handled the request, {@code false} otherwise
* @throws PoolingFeatureRtException if an error occurs
*/
private boolean doManager(PolicyController controller, MgrFunc func) {
@@ -284,26 +276,28 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl
return func.apply(mgr);
} catch (PoolingFeatureException e) {
- throw e.toRuntimeException();
+ throw new PoolingFeatureRtException(e);
}
}
/**
- * Executes a function using the manager associated with the controller and
- * then deletes the manager. Catches any exceptions from the function and
- * re-throws it as a runtime exception.
+ * Executes a function using the manager associated with the controller and then
+ * deletes the manager. Catches any exceptions from the function and re-throws it as a
+ * runtime exception.
*
* @param controller
* @param func function to be executed
- * @return {@code true} if the function handled the request, {@code false}
- * otherwise
+ * @return {@code true} if the function handled the request, {@code false} otherwise
* @throws PoolingFeatureRtException if an error occurs
*/
private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) {
+ String name = controller.getName();
+ logger.info("remove feature-pool-dmaap manager for {}", name);
+
// NOTE: using "remove()" instead of "get()"
- PoolingManagerImpl mgr = ctlr2pool.remove(controller.getName());
+ PoolingManagerImpl mgr = ctlr2pool.remove(name);
if (mgr == null) {
return false;
@@ -321,8 +315,8 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl
/**
*
* @param mgr
- * @return {@code true} if the request was handled by the manager,
- * {@code false} otherwise
+ * @return {@code true} if the request was handled by the manager, {@code false}
+ * otherwise
* @throws PoolingFeatureException
*/
public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
@@ -367,6 +361,14 @@ public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControl
public static class Factory {
/**
+ * @param featName feature name
+ * @return the properties for the specified feature
+ */
+ public Properties getProperties(String featName) {
+ return SystemPersistence.manager.getProperties(featName);
+ }
+
+ /**
* Makes a pooling manager for a controller.
*
* @param controller
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java
index 5efd1414..c3c81879 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeatureException.java
@@ -47,13 +47,4 @@ public class PoolingFeatureException extends Exception {
super(message, cause, enableSuppression, writableStackTrace);
}
- /**
- * Converts the exception to a runtime exception.
- *
- * @return a new runtime exception, wrapping this exception
- */
- public PoolingFeatureRtException toRuntimeException() {
- return new PoolingFeatureRtException(this);
- }
-
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index de08d1e1..5036b605 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledFuture;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Message;
@@ -48,19 +47,19 @@ public interface PoolingManager {
public String getHost();
/**
- * Gets the name of the internal DMaaP topic used by this manager to
- * communicate with its other hosts.
+ * Gets the name of the internal DMaaP topic used by this manager to communicate with
+ * its other hosts.
*
* @return the name of the internal DMaaP topic
*/
public String getTopic();
/**
- * Indicates that communication with internal DMaaP topic failed, typically
- * due to a missed heart beat. Stops the PolicyController.
+ * Indicates that communication with internal DMaaP topic failed, typically due to a
+ * missed heart beat. Stops the PolicyController.
*
- * @return a latch that can be used to determine when the controller's
- * stop() method has completed
+ * @return a latch that can be used to determine when the controller's stop() method
+ * has completed
*/
public CountDownLatch internalTopicFailed();
@@ -68,14 +67,16 @@ public interface PoolingManager {
* Starts distributing requests according to the given bucket assignments.
*
* @param assignments must <i>not</i> be {@code null}
+ * @return a latch that can be used to determine when the events in the event queue
+ * have all be processed
*/
- public void startDistributing(BucketAssignments assignments);
+ public CountDownLatch startDistributing(BucketAssignments assignments);
/**
* Gets the current bucket assignments.
*
- * @return the current bucket assignments, or {@code null} if no assignments
- * have been made
+ * @return the current bucket assignments, or {@code null} if no assignments have been
+ * made
*/
public BucketAssignments getAssignments();
@@ -95,8 +96,7 @@ public interface PoolingManager {
public void publish(String channel, Message msg);
/**
- * Handles a {@link Forward} event that was received from the internal
- * topic.
+ * Handles a {@link Forward} event that was received from the internal topic.
*
* @param event
*/
@@ -107,9 +107,9 @@ public interface PoolingManager {
*
* @param delayMs delay, in milliseconds
* @param task
- * @return a future that can be used to cancel the timer
+ * @return a new scheduled task
*/
- public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task);
+ public CancellableScheduledTask schedule(long delayMs, StateTimerTask task);
/**
* Schedules a timer to fire repeatedly.
@@ -117,9 +117,9 @@ public interface PoolingManager {
* @param initialDelayMs initial delay, in milliseconds
* @param delayMs delay, in milliseconds
* @param task
- * @return a future that can be used to cancel the timer
+ * @return a new scheduled task
*/
- public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task);
+ public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task);
/**
* Transitions to the "start" state.
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index cd71670d..422efdd7 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.event.comm.TopicListener;
@@ -38,6 +39,7 @@ import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.state.ActiveState;
import org.onap.policy.drools.pooling.state.IdleState;
import org.onap.policy.drools.pooling.state.InactiveState;
@@ -52,39 +54,30 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
/**
- * Implementation of a {@link PoolingManager}. Until bucket assignments have
- * been made, events coming from external topics are saved in a queue for later
- * processing. Once assignments are made, the saved events are processed. In
- * addition, while the controller is locked, events are still forwarded to other
- * hosts and bucket assignments are still updated, based on any {@link Leader}
- * messages that it receives.
+ * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
+ * events coming from external topics are saved in a queue for later processing. Once
+ * assignments are made, the saved events are processed. In addition, while the controller
+ * is locked, events are still forwarded to other hosts and bucket assignments are still
+ * updated, based on any {@link Leader} messages that it receives.
*/
public class PoolingManagerImpl implements PoolingManager, TopicListener {
private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
- // TODO metrics, audit logging
-
/**
* Maximum number of times a message can be forwarded.
*/
public static final int MAX_HOPS = 5;
/**
- * Type of item that the extractors will be extracting.
- */
- private static final String EXTRACTOR_TYPE = "requestId";
-
- /**
- * Prefix for extractor properties.
+ * Factory used to create various objects. Can be overridden during junit testing.
*/
- private static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE;
+ private static Factory factory = new Factory();
/**
- * Factory used to create various objects. Can be overridden during junit
- * testing.
+ * ID of the last host that was created.
*/
- private static Factory factory = new Factory();
+ private static final AtomicReference<String> lastHost = new AtomicReference<>(null);
/**
* ID of this host.
@@ -102,14 +95,12 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final PolicyController controller;
/**
- * Where to offer events that have been forwarded to this host (i.e, the
- * controller).
+ * Where to offer events that have been forwarded to this host (i.e, the controller).
*/
private final TopicListener listener;
/**
- * Used to encode & decode request objects received from & sent to a rule
- * engine.
+ * Used to encode & decode request objects received from & sent to a rule engine.
*/
private final Serializer serializer;
@@ -129,19 +120,18 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final ClassExtractors extractors;
/**
- * Lock used while updating {@link #current}. In general, public methods
- * must use this, while private methods assume the lock is already held.
+ * Lock used while updating {@link #current}. In general, public methods must use
+ * this, while private methods assume the lock is already held.
*/
private final Object curLocker = new Object();
/**
* Current state.
* <p>
- * This uses a finite state machine, wherein the state object contains all
- * of the data relevant to that state. Each state object has a process()
- * method, specific to each type of {@link Message} subclass. The method
- * returns the next state object, or {@code null} if the state is to remain
- * the same.
+ * This uses a finite state machine, wherein the state object contains all of the data
+ * relevant to that state. Each state object has a process() method, specific to each
+ * type of {@link Message} subclass. The method returns the next state object, or
+ * {@code null} if the state is to remain the same.
*/
private State current;
@@ -177,13 +167,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.controller = controller;
this.props = props;
+ lastHost.set(this.host);
+
try {
this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
this.eventq = factory.makeEventQueue(props);
- SpecProperties spec = new SpecProperties(PROP_EXTRACTOR_PREFIX, controller.getName());
+ SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(),
+ props.getSource());
this.extractors = factory.makeClassExtractors(spec);
this.dmaapMgr = factory.makeDmaapManager(props);
@@ -197,7 +190,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
} catch (PoolingFeatureException e) {
logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
- throw e.toRuntimeException();
+ throw new PoolingFeatureRtException(e);
}
}
@@ -210,6 +203,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
+ * Used by junit tests.
+ *
+ * @return the ID of the last host that was created, or {@code null} if no hosts have
+ * been created yet
+ */
+ protected static String getLastHost() {
+ return lastHost.get();
+ }
+
+ /**
* Should only be used by junit tests.
*
* @return the current state
@@ -234,22 +237,22 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Indicates that the controller is about to start. Starts the publisher for
- * the internal topic, and creates a thread pool for the timers.
+ * Indicates that the controller is about to start. Starts the publisher for the
+ * internal topic, and creates a thread pool for the timers.
*
- * @throws PoolingFeatureException if the internal topic publisher cannot be
- * started
+ * @throws PoolingFeatureException if the internal topic publisher cannot be started
*/
public void beforeStart() throws PoolingFeatureException {
synchronized (curLocker) {
if (scheduler == null) {
dmaapMgr.startPublisher();
+ logger.debug("make scheduler thread for topic {}", getTopic());
scheduler = factory.makeScheduler();
/*
- * Only a handful of timers at any moment, thus we can afford to
- * take the time to remove them when they're cancelled.
+ * Only a handful of timers at any moment, thus we can afford to take the
+ * time to remove them when they're cancelled.
*/
scheduler.setRemoveOnCancelPolicy(true);
scheduler.setMaximumPoolSize(1);
@@ -260,9 +263,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Indicates that the controller has successfully started. Starts the
- * consumer for the internal topic, enters the {@link StartState}, and sets
- * the filter for the initial state.
+ * Indicates that the controller has successfully started. Starts the consumer for the
+ * internal topic, enters the {@link StartState}, and sets the filter for the initial
+ * state.
*/
public void afterStart() {
synchronized (curLocker) {
@@ -274,8 +277,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Indicates that the controller is about to stop. Stops the consumer, the
- * scheduler, and the current state.
+ * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
+ * and the current state.
*/
public void beforeStop() {
ScheduledThreadPoolExecutor sched;
@@ -287,23 +290,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
if (!(current instanceof IdleState)) {
dmaapMgr.stopConsumer(this);
changeState(new IdleState(this));
-
- // TODO
- /*
- * Need a brief delay here to allow "offline" message to be
- * transmitted?
- */
+ publishAdmin(new Offline(getHost()));
}
}
if (sched != null) {
+ logger.debug("stop scheduler for topic {}", getTopic());
sched.shutdownNow();
}
}
/**
- * Indicates that the controller has stopped. Stops the publisher and logs a
- * warning if any events are still in the queue.
+ * Indicates that the controller has stopped. Stops the publisher and logs a warning
+ * if any events are still in the queue.
*/
public void afterStop() {
synchronized (curLocker) {
@@ -312,25 +311,33 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
eventq.clear();
}
- dmaapMgr.stopPublisher();
+ /*
+ * stop the publisher, but allow time for any Offline message to be
+ * transmitted
+ */
+ dmaapMgr.stopPublisher(props.getOfflinePubWaitMs());
}
}
/**
- * Indicates that the controller is about to be locked. Enters the idle
- * state, as all it will be doing is forwarding messages.
+ * Indicates that the controller is about to be locked. Enters the idle state, as all
+ * it will be doing is forwarding messages.
*/
public void beforeLock() {
+ logger.info("locking manager for topic {}", getTopic());
+
synchronized (curLocker) {
changeState(new IdleState(this));
}
}
/**
- * Indicates that the controller has been unlocked. Enters the start state,
- * if the controller is running.
+ * Indicates that the controller has been unlocked. Enters the start state, if the
+ * controller is running.
*/
public void afterUnlock() {
+ logger.info("unlocking manager for topic {}", getTopic());
+
synchronized (curLocker) {
if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
changeState(new StartState(this));
@@ -339,8 +346,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Changes the finite state machine to a new state, provided the new state
- * is not {@code null}.
+ * Changes the finite state machine to a new state, provided the new state is not
+ * {@code null}.
*
* @param newState new state, or {@code null} if to remain unchanged
*/
@@ -379,30 +386,46 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
CountDownLatch latch = new CountDownLatch(1);
/*
- * We don't want to build up items in our queue if we can't forward them
- * to other hosts, so we just stop the controller.
+ * We don't want to build up items in our queue if we can't forward them to other
+ * hosts, so we just stop the controller.
*
* Use a background thread to prevent deadlocks.
*/
- new Thread() {
- @Override
- public void run() {
- controller.stop();
- latch.countDown();
- }
- }.start();
+ new Thread(() -> {
+ controller.stop();
+ latch.countDown();
+ }).start();
return latch;
}
@Override
- public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task) {
- return scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
+ public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
+ // wrap the task in a TimerAction and schedule it
+ ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
+
+ // wrap the future in a "CancellableScheduledTask"
+ return new CancellableScheduledTask() {
+ @Override
+ public void cancel() {
+ fut.cancel(false);
+ }
+ };
}
@Override
- public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
- return scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, TimeUnit.MILLISECONDS);
+ public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ // wrap the task in a TimerAction and schedule it
+ ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
+ TimeUnit.MILLISECONDS);
+
+ // wrap the future in a "CancellableScheduledTask"
+ return new CancellableScheduledTask() {
+ @Override
+ public void cancel() {
+ fut.cancel(false);
+ }
+ };
}
@Override
@@ -412,6 +435,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
@Override
public void publish(String channel, Message msg) {
+ logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
+
msg.setChannel(channel);
try {
@@ -434,8 +459,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*
* @param topic2
* @param event
- * @return {@code true} if the event was handled, {@code false} if the
- * controller should handle it
+ * @return {@code true} if the event was handled, {@code false} if the controller
+ * should handle it
*/
@Override
public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
@@ -452,20 +477,19 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Called by the PolicyController before it offers the event to the
- * DroolsController. If the controller is locked, then it isn't processing
- * events. However, they still need to be forwarded, thus in that case, they
- * are decoded and forwarded.
+ * Called by the PolicyController before it offers the event to the DroolsController.
+ * If the controller is locked, then it isn't processing events. However, they still
+ * need to be forwarded, thus in that case, they are decoded and forwarded.
* <p>
- * On the other hand, if the controller is not locked, then we just return
- * immediately and let {@link #beforeInsert(Object, String, String, Object)
- * beforeInsert()} handle it instead, as it already has the decoded message.
+ * On the other hand, if the controller is not locked, then we just return immediately
+ * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
+ * it instead, as it already has the decoded message.
*
* @param protocol
* @param topic2
* @param event
- * @return {@code true} if the event was handled by the manager,
- * {@code false} if it must still be handled by the invoker
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
*/
public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
@@ -478,15 +502,14 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Called by the DroolsController before it inserts the event into the rule
- * engine.
+ * Called by the DroolsController before it inserts the event into the rule engine.
*
* @param protocol
* @param topic2
* @param event original event text, as received from the Bus
* @param event2 event, as an object
- * @return {@code true} if the event was handled by the manager,
- * {@code false} if it must still be handled by the invoker
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
*/
public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
@@ -504,10 +527,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* @param protocol
* @param topic2
* @param event
- * @param reqid request id extracted from the event, or {@code null} if it
- * couldn't be extracted
- * @return {@code true} if the event was handled by the manager,
- * {@code false} if it must still be handled by the invoker
+ * @param reqid request id extracted from the event, or {@code null} if it couldn't be
+ * extracted
+ * @return {@code true} if the event was handled by the manager, {@code false} if it
+ * must still be handled by the invoker
*/
private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
if (reqid == null) {
@@ -524,6 +547,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
Forward ev = makeForward(protocol, topic2, event, reqid);
if (ev == null) {
// invalid args - consume the message
+ logger.warn("constructed an invalid Forward message on topic {}", getTopic());
return true;
}
@@ -536,12 +560,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* Handles an event from an external topic.
*
* @param event
- * @return {@code true} if the event was handled, {@code false} if the
- * invoker should handle it
+ * @return {@code true} if the event was handled, {@code false} if the invoker should
+ * handle it
*/
private boolean handleExternal(Forward event) {
if (assignments == null) {
// no bucket assignments yet - add it to the queue
+ logger.info("queued event for request {}", event.getRequestId());
eventq.add(event);
// we've consumed the event
@@ -556,17 +581,17 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* Handles a {@link Forward} event, possibly forwarding it again.
*
* @param event
- * @return {@code true} if the event was handled, {@code false} if the
- * invoker should handle it
+ * @return {@code true} if the event was handled, {@code false} if the invoker should
+ * handle it
*/
private boolean handleEvent(Forward event) {
- int bucket = Math.abs(event.getRequestId().hashCode()) % assignments.size();
- String target = assignments.getAssignedHost(bucket);
+ String target = assignments.getAssignedHost(event.getRequestId().hashCode());
if (target == null) {
/*
* This bucket has no assignment - just discard the event
*/
+ logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
return true;
}
@@ -574,6 +599,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/*
* Message belongs to this host - allow the controller to handle it.
*/
+ logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
return false;
}
@@ -583,6 +609,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
topic);
} else {
+ logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
event.bumpNumHops();
publish(target, event);
}
@@ -678,16 +705,21 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* @param event
*/
private void inject(Forward event) {
- intercept = false;
- listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
+ logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
- intercept = true;
+ try {
+ intercept = false;
+ listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
+
+ } finally {
+ intercept = true;
+ }
}
/**
- * Handles an event from the internal topic. This uses reflection to
- * identify the appropriate process() method to invoke, based on the type of
- * Message that was decoded.
+ * Handles an event from the internal topic. This uses reflection to identify the
+ * appropriate process() method to invoke, based on the type of Message that was
+ * decoded.
*
* @param event the serialized {@link Message} read from the internal topic
*/
@@ -711,29 +743,48 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
} catch (NoSuchMethodException | SecurityException e) {
logger.error("no processor for message {} for topic {}", clazz, topic, e);
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- logger.error("failed to process message {} for topic {}", clazz, topic, e);
-
- } catch (PoolingFeatureException e) {
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
+ | PoolingFeatureException e) {
logger.error("failed to process message {} for topic {}", clazz, topic, e);
}
}
@Override
- public void startDistributing(BucketAssignments assignments) {
- if (assignments == null) {
- return;
+ public CountDownLatch startDistributing(BucketAssignments asgn) {
+ if (asgn == null) {
+ return null;
}
+ logger.info("new assignments for topic {}", getTopic());
+
synchronized (curLocker) {
- this.assignments = assignments;
+ assignments = asgn;
+ }
+
+ /*
+ * publish the events from the event queue, but do it in a background thread so
+ * that the state machine can enter its correct state BEFORE we start processing
+ * the events
+ */
+ CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(() -> {
+ synchronized (curLocker) {
+ if (assignments == null) {
+ return;
+ }
- // now that we have assignments, we can process the queue
- Forward ev;
- while ((ev = eventq.poll()) != null) {
- handle(ev);
+ // now that we have assignments, we can process the queue
+ Forward ev;
+ while ((ev = eventq.poll()) != null) {
+ handle(ev);
+ }
+
+ latch.countDown();
}
- }
+ }).start();
+
+ return latch;
}
@Override
@@ -762,8 +813,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Action to run a timer task. Only runs the task if the machine is still in
- * the state that it was in when the timer was created.
+ * Action to run a timer task. Only runs the task if the machine is still in the state
+ * that it was in when the timer was created.
*/
private class TimerAction implements Runnable {
@@ -790,7 +841,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
public void run() {
synchronized (curLocker) {
if (current == origState) {
- changeState(task.fire(null));
+ changeState(task.fire());
}
}
}
@@ -818,7 +869,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* @return a new set of extractors
*/
public ClassExtractors makeClassExtractors(Properties props) {
- return new ClassExtractors(props, PROP_EXTRACTOR_PREFIX, EXTRACTOR_TYPE);
+ return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
+ PoolingProperties.EXTRACTOR_TYPE);
}
/**
@@ -846,8 +898,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*
* @param drools drools controller
* @param topic topic on which the event was received
- * @return {@code true} if the event can be decoded, {@code false}
- * otherwise
+ * @return {@code true} if the event can be decoded, {@code false} otherwise
*/
public boolean canDecodeEvent(DroolsController drools, String topic) {
return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
index 1cbe5cb9..4e8de0d0 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
@@ -30,6 +30,11 @@ import org.onap.policy.common.utils.properties.exception.PropertyException;
public class PoolingProperties extends SpecPropertyConfiguration {
/**
+ * The feature name, used to retrieve properties.
+ */
+ public static final String FEATURE_NAME = "feature-pooling-dmaap";
+
+ /**
* Feature properties all begin with this prefix.
*/
public static final String PREFIX = "pooling.";
@@ -51,6 +56,17 @@ public class PoolingProperties extends SpecPropertyConfiguration {
public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds";
public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds";
public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds";
+ public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds";
+
+ /**
+ * Type of item that the extractors will be extracting.
+ */
+ public static final String EXTRACTOR_TYPE = "requestId";
+
+ /**
+ * Prefix for extractor properties.
+ */
+ public static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE;
/**
* Properties from which this was constructed.
@@ -113,6 +129,13 @@ public class PoolingProperties extends SpecPropertyConfiguration {
private long interHeartbeatMs;
/**
+ * Time, in milliseconds, to wait for an "Offline" message to be published
+ * to DMaaP.
+ */
+ @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
+ private long offlinePubWaitMs;
+
+ /**
* @param controllerName the name of the controller
* @param props set of properties used to configure this
* @throws PropertyException if an error occurs
@@ -159,4 +182,8 @@ public class PoolingProperties extends SpecPropertyConfiguration {
public long getInterHeartbeatMs() {
return interHeartbeatMs;
}
+
+ public long getOfflinePubWaitMs() {
+ return offlinePubWaitMs;
+ }
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
index e4557404..31b4e614 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
@@ -23,8 +23,7 @@ package org.onap.policy.drools.pooling;
import java.util.Properties;
/**
- * Properties with an optional specialization (e.g., session name, controller
- * name).
+ * Properties with an optional specialization (e.g., session name, controller name).
*/
public class SpecProperties extends Properties {
private static final long serialVersionUID = 1L;
@@ -41,10 +40,8 @@ public class SpecProperties extends Properties {
/**
*
- * @param prefix the property name prefix that appears before any
- * specialization
- * @param specialization the property name specialization (e.g., session
- * name)
+ * @param prefix the property name prefix that appears before any specialization
+ * @param specialization the property name specialization (e.g., session name)
*/
public SpecProperties(String prefix, String specialization) {
this.prefix = withTrailingDot(prefix);
@@ -53,10 +50,8 @@ public class SpecProperties extends Properties {
/**
*
- * @param prefix the property name prefix that appears before any
- * specialization
- * @param specialization the property name specialization (e.g., session
- * name)
+ * @param prefix the property name prefix that appears before any specialization
+ * @param specialization the property name specialization (e.g., session name)
* @param props the default properties
*/
public SpecProperties(String prefix, String specialization, Properties props) {
@@ -77,13 +72,14 @@ public class SpecProperties extends Properties {
}
/**
- * Gets the property whose value has the given key, looking first for the
- * specialized property name, and then for the generalized property name.
+ * Gets the property whose value has the given key, looking first for the specialized
+ * property name, and then for the generalized property name.
*
* @param key property name, without the specialization
- * @return the value from the property set, or {@code null} if the property
- * set does not contain the value
+ * @return the value from the property set, or {@code null} if the property set does
+ * not contain the value
*/
+ @Override
public String getProperty(String key) {
if (!key.startsWith(prefix)) {
return super.getProperty(key);
@@ -106,4 +102,14 @@ public class SpecProperties extends Properties {
protected String getSpecPrefix() {
return specPrefix;
}
+
+ @Override
+ public final int hashCode() {
+ throw new UnsupportedOperationException("HostBucket cannot be hashed");
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ throw new UnsupportedOperationException("cannot compare HostBuckets");
+ }
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
index cb12a6ac..2752ca8c 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
@@ -152,7 +152,7 @@ public class ClassExtractors {
* cannot extract data items from objects of this type, so just
* allocated a null extractor.
*/
- logger.warn("missing property " + prefix + clazz.getName());
+ logger.warn("missing property {}{}", prefix, clazz.getName());
return new NullExtractor();
}
@@ -166,24 +166,24 @@ public class ClassExtractors {
*/
private Extractor buildExtractor(Class<?> clazz, String value) {
if (!value.startsWith("${")) {
- logger.warn("property value for " + prefix + clazz.getName() + " does not start with '${'");
+ logger.warn("property value for {}{} does not start with '${'", prefix, clazz.getName());
return new NullExtractor();
}
if (!value.endsWith("}")) {
- logger.warn("property value for " + prefix + clazz.getName() + " does not end with '}'");
+ logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName());
return new NullExtractor();
}
// get the part in the middle
String val = value.substring(2, value.length() - 1);
if (val.startsWith(".")) {
- logger.warn("property value for " + prefix + clazz.getName() + " begins with '.'");
+ logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName());
return new NullExtractor();
}
if (val.endsWith(".")) {
- logger.warn("property value for " + prefix + clazz.getName() + " ends with '.'");
+ logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName());
return new NullExtractor();
}
@@ -198,7 +198,7 @@ public class ClassExtractors {
return (ext.extractors.length == 1 ? ext.extractors[0] : ext);
} catch (ExtractorException e) {
- logger.warn("cannot build extractor for " + clazz.getName());
+ logger.warn("cannot build extractor for {}", clazz.getName(), e);
return new NullExtractor();
}
}
@@ -255,7 +255,7 @@ public class ClassExtractors {
@Override
public Object extract(Object object) {
- logger.warn("cannot extract " + type + " from " + object.getClass());
+ logger.warn("cannot extract {} from {}", type, object.getClass());
return null;
}
}
@@ -374,6 +374,7 @@ public class ClassExtractors {
} catch (NoSuchMethodException expected) {
// no getXxx() method, maybe there's a field by this name
+ logger.debug("no method {} in {}", nm, clazz.getName());
return null;
} catch (SecurityException e) {
@@ -407,9 +408,8 @@ public class ClassExtractors {
* @param key item key within the map
* @return a new extractor, or {@code null} if the class is not a Map
* subclass
- * @throws ExtractorException
*/
- private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) throws ExtractorException {
+ private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) {
if (!Map.class.isAssignableFrom(clazz)) {
return null;
@@ -445,6 +445,7 @@ public class ClassExtractors {
} catch (NoSuchFieldException expected) {
// no field by this name - try super class & interfaces
+ logger.debug("no field {} in {}", name, clazz.getName());
} catch (SecurityException e) {
throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
index 9ed32ae4..38d440cb 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
@@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling.extractor;
/**
* Used to extract an object contained within another object.
*/
+@FunctionalInterface
public interface Extractor {
/**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
index aff9d860..032ea47e 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
@@ -53,7 +53,7 @@ public class MapExtractor implements Extractor {
return map.get(key);
} else {
- logger.warn("expecting a map, instead of " + object.getClass());
+ logger.warn("expecting a map, instead of {}", object.getClass());
return null;
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
index 8fd86c1e..ee871cbd 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
@@ -37,9 +37,19 @@ public class BucketAssignments {
private static final Logger logger = LoggerFactory.getLogger(BucketAssignments.class);
/**
- * Number of buckets.
+ * The number of bits in the maximum number of buckets.
*/
- public static final int MAX_BUCKETS = 1024;
+ private static final int MAX_BUCKET_BITS = 10;
+
+ /**
+ * Maximum number of buckets. Must be a power of two.
+ */
+ public static final int MAX_BUCKETS = 1 << MAX_BUCKET_BITS;
+
+ /**
+ * Used to ensure that a hash code is not negative.
+ */
+ private static final int MAX_BUCKETS_MASK = MAX_BUCKETS - 1;
/**
* Identifies the host serving a particular bucket.
@@ -55,8 +65,8 @@ public class BucketAssignments {
/**
*
- * @param hostArray maps a bucket number (i.e., array index) to a host. All
- * values must be non-null
+ * @param hostArray maps a bucket number (i.e., array index) to a host. All values
+ * must be non-null
*/
public BucketAssignments(String[] hostArray) {
this.hostArray = hostArray;
@@ -97,8 +107,7 @@ public class BucketAssignments {
* Determines if a host has an assignment.
*
* @param host host to be checked
- * @return {@code true} if the host has an assignment, {@code false}
- * otherwise
+ * @return {@code true} if the host has an assignment, {@code false} otherwise
*/
@JsonIgnore
public boolean hasAssignment(String host) {
@@ -139,23 +148,17 @@ public class BucketAssignments {
/**
* Gets the host assigned to a given bucket.
*
- * @param bucket bucket number
- * @return the assigned host, or {@code null} if the bucket has no assigned
- * host
+ * @param hashCode hash code of the item whose assignment is desired
+ * @return the assigned host, or {@code null} if the item has no assigned host
*/
@JsonIgnore
- public String getAssignedHost(int bucket) {
- if (hostArray == null) {
+ public String getAssignedHost(int hashCode) {
+ if (hostArray == null || hostArray.length == 0) {
logger.error("no buckets have been assigned");
return null;
}
- if (bucket < 0 || bucket >= hostArray.length) {
- logger.error("invalid bucket number {} maximum {}", bucket, hostArray.length);
- return null;
- }
-
- return hostArray[bucket];
+ return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length];
}
/**
@@ -169,8 +172,8 @@ public class BucketAssignments {
}
/**
- * Checks the validity of the assignments, verifying that all buckets have
- * been assigned to a host.
+ * Checks the validity of the assignments, verifying that all buckets have been
+ * assigned to a host.
*
* @throws PoolingFeatureException if the assignments are invalid
*/
@@ -208,8 +211,6 @@ public class BucketAssignments {
if (getClass() != obj.getClass())
return false;
BucketAssignments other = (BucketAssignments) obj;
- if (!Arrays.equals(hostArray, other.hostArray))
- return false;
- return true;
+ return Arrays.equals(hostArray, other.hostArray);
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
index b59cfbb2..6122d361 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
@@ -146,6 +146,7 @@ public class Forward extends Message {
}
+ @Override
@JsonIgnore
public void checkValidity() throws PoolingFeatureException {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
index 5f503a3b..b0a36cd9 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
@@ -24,16 +24,20 @@ import java.util.Arrays;
import java.util.TreeSet;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Offline;
-import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The active state. In this state, this host has one more more bucket
- * assignments and processes any events associated with one of its buckets.
- * Other events are forwarded to appropriate target hosts.
+ * The active state. In this state, this host has one more more bucket assignments and
+ * processes any events associated with one of its buckets. Other events are forwarded to
+ * appropriate target hosts.
*/
public class ActiveState extends ProcessingState {
+ private static final Logger logger = LoggerFactory.getLogger(ActiveState.class);
+
/**
* Set of hosts that have been assigned a bucket.
*/
@@ -50,8 +54,8 @@ public class ActiveState extends ProcessingState {
private String predHost = "";
/**
- * {@code True} if we saw this host's heart beat since the last check,
- * {@code false} otherwise.
+ * {@code True} if we saw this host's heart beat since the last check, {@code false}
+ * otherwise.
*/
private boolean myHeartbeatSeen = false;
@@ -74,14 +78,14 @@ public class ActiveState extends ProcessingState {
}
/**
- * Determine this host's neighbors based on the order of the host UUIDs.
- * Updates {@link #succHost} and {@link #predHost}.
+ * Determine this host's neighbors based on the order of the host UUIDs. Updates
+ * {@link #succHost} and {@link #predHost}.
*/
private void detmNeighbors() {
if (assigned.size() < 2) {
+ logger.info("this host has no neighbors on topic {}", getTopic());
/*
- * this host is the only one with any assignments - it has no
- * neighbors
+ * this host is the only one with any assignments - it has no neighbors
*/
succHost = null;
predHost = "";
@@ -91,11 +95,13 @@ public class ActiveState extends ProcessingState {
if ((succHost = assigned.higher(getHost())) == null) {
// wrapped around - successor is the first host in the set
succHost = assigned.first();
+ logger.info("this host's successor is {} on topic {}", succHost, getTopic());
}
if ((predHost = assigned.lower(getHost())) == null) {
// wrapped around - predecessor is the last host in the set
predHost = assigned.last();
+ logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
}
}
@@ -109,13 +115,14 @@ public class ActiveState extends ProcessingState {
* Adds the timers.
*/
private void addTimers() {
+ logger.info("add timers");
/*
* heart beat generator
*/
long genMs = getProperties().getActiveHeartbeatMs();
- scheduleWithFixedDelay(genMs, genMs, xxx -> {
+ scheduleWithFixedDelay(genMs, genMs, () -> {
genHeartbeat();
return null;
});
@@ -125,13 +132,14 @@ public class ActiveState extends ProcessingState {
*/
long interMs = getProperties().getInterHeartbeatMs();
- scheduleWithFixedDelay(interMs, interMs, xxx -> {
+ scheduleWithFixedDelay(interMs, interMs, () -> {
if (myHeartbeatSeen) {
myHeartbeatSeen = false;
return null;
}
// missed my heart beat
+ logger.error("missed my heartbeat on topic {}", getTopic());
return internalTopicFailed();
});
@@ -141,13 +149,15 @@ public class ActiveState extends ProcessingState {
*/
if (!predHost.isEmpty()) {
- scheduleWithFixedDelay(interMs, interMs, xxx -> {
+ scheduleWithFixedDelay(interMs, interMs, () -> {
if (predHeartbeatSeen) {
predHeartbeatSeen = false;
return null;
}
// missed the predecessor's heart beat
+ logger.warn("missed predecessor's heartbeat on topic {}", getTopic());
+
publish(makeQuery());
return goQuery();
@@ -172,70 +182,85 @@ public class ActiveState extends ProcessingState {
String src = msg.getSource();
if (src == null) {
+ logger.warn("Heartbeat message has no source on topic {}", getTopic());
return null;
} else if (src.equals(getHost())) {
+ logger.info("saw my heartbeat on topic {}", getTopic());
myHeartbeatSeen = true;
} else if (src.equals(predHost)) {
+ logger.info("saw heartbeat from {} on topic {}", src, getTopic());
predHeartbeatSeen = true;
-
+
+ } else {
+ logger.info("ignored heartbeat message from {} on topic {}", src, getTopic());
}
return null;
}
@Override
+ public State process(Leader msg) {
+ if (!isValid(msg)) {
+ return null;
+ }
+
+ String src = msg.getSource();
+
+ if (getHost().compareTo(src) < 0) {
+ // our host would be a better leader - find out what's up
+ logger.warn("unexpected Leader message from {} on topic {}", src, getTopic());
+ return goQuery();
+ }
+
+ logger.info("have a new leader {} on topic {}", src, getTopic());
+
+ return goActive(msg.getAssignments());
+ }
+
+ @Override
public State process(Offline msg) {
String src = msg.getSource();
if (src == null) {
+ logger.warn("Offline message has no source on topic {}", getTopic());
return null;
} else if (!assigned.contains(src)) {
/*
- * the offline host wasn't assigned any buckets, so just ignore the
- * message
+ * the offline host wasn't assigned any buckets, so just ignore the message
*/
+ logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic());
return null;
} else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) {
/*
* Case 1: We are the leader.
*
- * Case 2: Our predecessor was the leader and it has gone offline -
- * we should become the leader.
+ * Case 2: Our predecessor was the leader and it has gone offline - we should
+ * become the leader.
*
- * In either case, we are now the leader and we must re-balance the
- * buckets since one of the hosts has gone offline.
+ * In either case, we are now the leader and we must re-balance the buckets
+ * since one of the hosts has gone offline.
*/
+ logger.info("Offline message from source {} on topic {}", src, getTopic());
+
assigned.remove(src);
return becomeLeader(assigned);
} else {
/*
- * Otherwise, we don't care right now - we'll wait for the leader to
- * tell us it's been removed.
+ * Otherwise, we don't care right now - we'll wait for the leader to tell us
+ * it's been removed.
*/
+ logger.info("ignore Offline message from source {} on topic {}", src, getTopic());
return null;
}
}
- /**
- * Transitions to the query state.
- */
- @Override
- public State process(Query msg) {
- State next = super.process(msg);
- if (next != null) {
- return next;
- }
-
- return goQuery();
- }
-
protected String getSuccHost() {
return succHost;
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java
index 27678360..4878c241 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/IdleState.java
@@ -21,11 +21,6 @@
package org.onap.policy.drools.pooling.state;
import org.onap.policy.drools.pooling.PoolingManager;
-import org.onap.policy.drools.pooling.message.Heartbeat;
-import org.onap.policy.drools.pooling.message.Identification;
-import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Offline;
-import org.onap.policy.drools.pooling.message.Query;
/**
* Idle state, used when offline.
@@ -35,51 +30,4 @@ public class IdleState extends State {
public IdleState(PoolingManager mgr) {
super(mgr);
}
-
- @Override
- public void stop() {
- // do nothing - don't even send of "offline" message
- }
-
- /**
- * Discards the message.
- */
- @Override
- public State process(Heartbeat msg) {
- return null;
- }
-
- /**
- * Discards the message.
- */
- @Override
- public State process(Identification msg) {
- return null;
- }
-
- /**
- * Copies the assignments, but doesn't change states.
- */
- @Override
- public State process(Leader msg) {
- super.process(msg);
- return null;
- }
-
- /**
- * Discards the message.
- */
- @Override
- public State process(Offline msg) {
- return null;
- }
-
- /**
- * Discards the message.
- */
- @Override
- public State process(Query msg) {
- return null;
- }
-
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
index 1c8e4dcc..6be2fb84 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -21,13 +21,19 @@
package org.onap.policy.drools.pooling.state;
import org.onap.policy.drools.pooling.PoolingManager;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The inactive state. In this state, we just wait a bit and then try to
- * re-activate. In the meantime, all messages are ignored.
+ * The inactive state. In this state, we just wait a bit and then try to re-activate. In
+ * the meantime, all messages are ignored.
*/
public class InactiveState extends State {
+ private static final Logger logger = LoggerFactory.getLogger(InactiveState.class);
+
/**
*
* @param mgr
@@ -41,11 +47,36 @@ public class InactiveState extends State {
super.start();
- schedule(getProperties().getReactivateMs(), xxx -> goStart());
+ schedule(getProperties().getReactivateMs(), () -> goStart());
+ }
+
+ @Override
+ public State process(Leader msg) {
+ if(isValid(msg)) {
+ logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic());
+ startDistributing(msg.getAssignments());
+
+ if(msg.getAssignments().hasAssignment(getHost())) {
+ logger.info("received Leader message on topic {}", getTopic());
+ return goActive();
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Generates an Identification message and goes to the query state.
+ */
+ @Override
+ public State process(Query msg) {
+ logger.info("received Query message on topic {}", getTopic());
+ publish(makeIdentification());
+ return goQuery();
}
/**
- * Remains in this state.
+ * Remains in this state, without resetting any timers.
*/
@Override
protected State goInactive() {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
index 2f830c66..1e9bb581 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
@@ -30,15 +30,13 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Any state in which events are being processed locally and forwarded, as
- * appropriate.
+ * Any state in which events are being processed locally and forwarded, as appropriate.
*/
public class ProcessingState extends State {
@@ -52,8 +50,8 @@ public class ProcessingState extends State {
/**
*
* @param mgr
- * @param leader current known leader, which need not be the same as the
- * assignment leader. Never {@code null}
+ * @param leader current known leader, which need not be the same as the assignment
+ * leader. Never {@code null}
* @throws IllegalArgumentException if an argument is invalid
*/
public ProcessingState(PoolingManager mgr, String leader) {
@@ -76,21 +74,31 @@ public class ProcessingState extends State {
}
/**
- * Generates an Identification message and returns {@code null}.
+ * Goes active with a new set of assignments.
+ *
+ * @param asgn new assignments
+ * @return the new state, either Active or Inactive, depending on whether or not this
+ * host has an assignment
*/
- @Override
- public State process(Query msg) {
- publish(makeIdentification());
- return goQuery();
+ protected State goActive(BucketAssignments asgn) {
+ startDistributing(asgn);
+
+ if (asgn.hasAssignment(getHost())) {
+ return goActive();
+
+ } else {
+ return goInactive();
+ }
}
/**
- * Makes an Identification message.
- *
- * @return a new message
+ * Generates an Identification message and goes to the query state.
*/
- protected Identification makeIdentification() {
- return new Identification(getHost(), getAssignments());
+ @Override
+ public State process(Query msg) {
+ logger.info("received Query message on topic {}", getTopic());
+ publish(makeIdentification());
+ return goQuery();
}
/**
@@ -132,18 +140,17 @@ public class ProcessingState extends State {
}
/**
- * Becomes the leader. Publishes a Leader message and enters the
- * {@link ActiveState}.
+ * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}.
*
* @param alive hosts that are known to be alive
*
* @return the new state
*/
protected State becomeLeader(SortedSet<String> alive) {
- String leader = getHost();
+ String newLeader = getHost();
- if (!leader.equals(alive.first())) {
- throw new IllegalArgumentException(leader + " cannot replace " + alive.first());
+ if (!newLeader.equals(alive.first())) {
+ throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first());
}
Leader msg = makeLeader(alive);
@@ -155,8 +162,8 @@ public class ProcessingState extends State {
}
/**
- * Makes a leader message. Assumes "this" host is the leader, and thus
- * appears as the first host in the set of hosts that are still alive.
+ * Makes a leader message. Assumes "this" host is the leader, and thus appears as the
+ * first host in the set of hosts that are still alive.
*
* @param alive hosts that are known to be alive
*
@@ -222,8 +229,8 @@ public class ProcessingState extends State {
}
/**
- * Removes excess hosts from the set of available hosts. Assumes "this" host
- * is the leader, and thus appears as the first host in the set.
+ * Removes excess hosts from the set of available hosts. Assumes "this" host is the
+ * leader, and thus appears as the first host in the set.
*
* @param maxHosts maximum number of hosts to be retained
* @param avail available hosts
@@ -231,9 +238,9 @@ public class ProcessingState extends State {
private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
while (avail.size() > maxHosts) {
/*
- * Don't remove this host, as it's the leader. Since the leader is
- * always at the front of the sorted set, we'll just pick off hosts
- * from the back of the set.
+ * Don't remove this host, as it's the leader. Since the leader is always at
+ * the front of the sorted set, we'll just pick off hosts from the back of the
+ * set.
*/
String host = avail.last();
avail.remove(host);
@@ -243,15 +250,15 @@ public class ProcessingState extends State {
}
/**
- * Adds bucket indices to {@link HostBucket} objects. Buckets that are
- * unassigned or assigned to a host that does not appear within the map are
- * re-assigned to a host that appears within the map.
+ * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or
+ * assigned to a host that does not appear within the map are re-assigned to a host
+ * that appears within the map.
*
* @param bucket2host bucket assignments
* @param host2data maps a host name to its {@link HostBucket}
*/
private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
- LinkedList<Integer> nullBuckets = new LinkedList<Integer>();
+ LinkedList<Integer> nullBuckets = new LinkedList<>();
for (int x = 0; x < bucket2host.length; ++x) {
String host = bucket2host[x];
@@ -274,10 +281,9 @@ public class ProcessingState extends State {
}
/**
- * Assigns null buckets (i.e., those having no assignment) to available
- * hosts.
+ * Assigns null buckets (i.e., those having no assignment) to available hosts.
*
- * @param buckets available hosts
+ * @param buckets buckets that still need to be assigned to hosts
* @param coll collection of current host-bucket assignments
*/
private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
@@ -295,9 +301,9 @@ public class ProcessingState extends State {
}
/**
- * Re-balances the buckets, taking from those that have a larger count and
- * giving to those that have a smaller count. Populates an output array with
- * the new assignments.
+ * Re-balances the buckets, taking from those that have a larger count and giving to
+ * those that have a smaller count. Populates an output array with the new
+ * assignments.
*
* @param coll current bucket assignment
* @param bucket2host array to be populated with the new assignments
@@ -349,7 +355,7 @@ public class ProcessingState extends State {
/**
* Tracks buckets that have been assigned to a host.
*/
- public static class HostBucket implements Comparable<HostBucket> {
+ protected static class HostBucket implements Comparable<HostBucket> {
/**
* Host to which the buckets have been assigned.
*/
@@ -395,8 +401,8 @@ public class ProcessingState extends State {
}
/**
- * Compares host buckets, first by the number of buckets, and then by
- * the host name.
+ * Compares host buckets, first by the number of buckets, and then by the host
+ * name.
*/
@Override
public final int compareTo(HostBucket other) {
@@ -406,5 +412,15 @@ public class ProcessingState extends State {
}
return d;
}
+
+ @Override
+ public final int hashCode() {
+ throw new UnsupportedOperationException("HostBucket cannot be hashed");
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ throw new UnsupportedOperationException("cannot compare HostBuckets");
+ }
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
index 57521960..9045165b 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
@@ -26,26 +26,31 @@ import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Offline;
-
-// TODO add logging
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The Query state. In this state, the host waits for the other hosts to
- * identify themselves. Eventually, a leader should come forth. If not, it will
- * transition to the active or inactive state, depending on whether or not it
- * has an assignment in the current bucket assignments. The other possibility is
- * that it may <i>become</i> the leader, in which case it will also transition
- * to the active state.
+ * The Query state. In this state, the host waits for the other hosts to identify
+ * themselves. Eventually, a leader should come forth. If not, it will transition to the
+ * active or inactive state, depending on whether or not it has an assignment in the
+ * current bucket assignments. The other possibility is that it may <i>become</i> the
+ * leader, in which case it will also transition to the active state.
*/
public class QueryState extends ProcessingState {
+ private static final Logger logger = LoggerFactory.getLogger(QueryState.class);
+
/**
- * Hosts that have sent an "Identification" message. Always includes this
- * host.
+ * Hosts that have sent an "Identification" message. Always includes this host.
*/
private TreeSet<String> alive = new TreeSet<>();
/**
+ * {@code True} if we saw our own Identification method, {@code false} otherwise.
+ */
+ private boolean sawSelfIdent = false;
+
+ /**
*
* @param mgr
*/
@@ -71,44 +76,42 @@ public class QueryState extends ProcessingState {
private void awaitIdentification() {
/*
- * Once we've waited long enough for all Identification messages to
- * arrive, become the leader, assuming we should.
+ * Once we've waited long enough for all Identification messages to arrive, become
+ * the leader, assuming we should.
*/
- schedule(getProperties().getIdentificationMs(), xxx -> {
+ schedule(getProperties().getIdentificationMs(), () -> {
+
+ if (!sawSelfIdent) {
+ // didn't see our identification
+ logger.error("missed our own Ident message on topic {}", getTopic());
+ return internalTopicFailed();
- if (isLeader()) {
+ } else if (isLeader()) {
// "this" host is the new leader
+ logger.info("this host is the new leader for topic {}", getTopic());
return becomeLeader(alive);
} else if (hasAssignment()) {
/*
- * this host is not the new leader, but it does have an
- * assignment - return to the active state while we wait for the
- * leader
+ * this host is not the new leader, but it does have an assignment -
+ * return to the active state while we wait for the leader
*/
+ logger.info("no new leader on topic {}", getTopic());
return goActive();
} else {
// not the leader and no assignment yet
+ logger.info("no new leader on topic {}", getTopic());
return goInactive();
}
});
}
/**
- * Remains in this state.
- */
- @Override
- public State goQuery() {
- return null;
- }
-
- /**
* Determines if this host has an assignment in the CURRENT assignments.
*
- * @return {@code true} if this host has an assignment, {@code false}
- * otherwise
+ * @return {@code true} if this host has an assignment, {@code false} otherwise
*/
protected boolean hasAssignment() {
BucketAssignments asgn = getAssignments();
@@ -116,53 +119,73 @@ public class QueryState extends ProcessingState {
}
@Override
+ public State goQuery() {
+ return null;
+ }
+
+ @Override
public State process(Identification msg) {
- recordInfo(msg.getSource(), msg.getAssignments());
+ if (getHost().equals(msg.getSource())) {
+ logger.info("saw our own Ident message on topic {}", getTopic());
+ sawSelfIdent = true;
+
+ } else {
+ logger.info("received Ident message from {} on topic {}", msg.getSource(), getTopic());
+ recordInfo(msg.getSource(), msg.getAssignments());
+ }
return null;
}
/**
- * If the message leader is better than the leader we have, then go active
- * with it. Otherwise, simply treat it like an {@link Identification}
- * message.
+ * If the message leader is better than the leader we have, then go active with it.
+ * Otherwise, simply treat it like an {@link Identification} message.
*/
@Override
public State process(Leader msg) {
- BucketAssignments asgn = msg.getAssignments();
- if (asgn == null) {
+ if (!isValid(msg)) {
return null;
}
- // ignore Leader messages from ourself
String source = msg.getSource();
- if (source == null || source.equals(getHost())) {
- return null;
- }
-
- // the new leader must equal the source
- if (!source.equals(asgn.getLeader())) {
- return null;
- }
+ BucketAssignments asgn = msg.getAssignments();
- // go active, if this has a better leader than the one we have
- if (source.compareTo(getLeader()) < 0) {
- return super.process(msg);
+ // go active, if this has a leader that's the same or better than the one we have
+ if (source.compareTo(getLeader()) <= 0) {
+ logger.warn("leader with {} on topic {}", source, getTopic());
+ return goActive(asgn);
}
/*
- * The message does not have an acceptable leader, but we'll still
- * record its info.
+ * The message does not have an acceptable leader, but we'll still record its
+ * info.
*/
- recordInfo(msg.getSource(), msg.getAssignments());
+ logger.info("record leader info from {} on topic {}", source, getTopic());
+ recordInfo(source, asgn);
+
+ return null;
+ }
+
+ @Override
+ public State process(Offline msg) {
+ String host = msg.getSource();
+
+ if (host != null && !host.equals(getHost())) {
+ logger.warn("host {} offline on topic {}", host, getTopic());
+ alive.remove(host);
+ setLeader(alive.first());
+
+ } else {
+ logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
+ }
return null;
}
/**
- * Records info from a message, adding the source host name to
- * {@link #alive}, and updating the bucket assignments.
+ * Records info from a message, adding the source host name to {@link #alive}, and
+ * updating the bucket assignments.
*
* @param source the message's source host
* @param assignments assignments, or {@code null}
@@ -181,29 +204,18 @@ public class QueryState extends ProcessingState {
// record assignments, if we don't have any yet
BucketAssignments current = getAssignments();
if (current == null) {
+ logger.info("received initial assignments on topic {}", getTopic());
setAssignments(assignments);
return;
}
/*
- * Record assignments, if the new assignments have a better (i.e.,
- * lesser) leader.
+ * Record assignments, if the new assignments have a better (i.e., lesser) leader.
*/
String curldr = current.getLeader();
- if (curldr == null || curldr.compareTo(assignments.getLeader()) > 0) {
+ if (curldr == null || assignments.getLeader().compareTo(curldr) < 0) {
+ logger.info("use new assignments from {} on topic {}", source, getTopic());
setAssignments(assignments);
}
}
-
- @Override
- public State process(Offline msg) {
- String host = msg.getSource();
-
- if (host != null && !host.equals(getHost())) {
- alive.remove(host);
- setLeader(alive.first());
- }
-
- return null;
- }
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
index decbdfda..a978e245 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -28,19 +28,19 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
import java.util.Map;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.Heartbeat;
-import org.onap.policy.drools.pooling.message.Identification;
-import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
-import org.onap.policy.drools.pooling.message.Offline;
-import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The start state. Upon entry, a heart beat is generated and the event filter
- * is changed to look for just that particular message. Once the message is
- * seen, it goes into the {@link QueryState}.
+ * The start state. Upon entry, a heart beat is generated and the event filter is changed
+ * to look for just that particular message. Once the message is seen, it goes into the
+ * {@link QueryState}.
*/
public class StartState extends State {
+ private static final Logger logger = LoggerFactory.getLogger(StartState.class);
+
/**
* Time stamp inserted into the heart beat message.
*/
@@ -69,54 +69,28 @@ public class StartState extends State {
publish(getHost(), makeHeartbeat(hbTimestampMs));
- schedule(getProperties().getStartHeartbeatMs(), xxx -> internalTopicFailed());
+ schedule(getProperties().getStartHeartbeatMs(), () -> {
+ logger.error("missed heartbeat on topic {}", getTopic());
+ return internalTopicFailed();
+ });
}
/**
- * Transitions to the query state if the heart beat originated from this
- * host and its time stamp matches.
+ * Transitions to the query state if the heart beat originated from this host and its
+ * time stamp matches.
*/
@Override
public State process(Heartbeat msg) {
if (msg.getTimestampMs() == hbTimestampMs && getHost().equals(msg.getSource())) {
// saw our own heart beat - transition to query state
+ logger.info("saw our own heartbeat on topic {}", getTopic());
publish(makeQuery());
return goQuery();
- }
-
- return null;
- }
-
- /**
- * Discards the message.
- */
- @Override
- public State process(Identification msg) {
- return null;
- }
-
- /**
- * Processes the assignments, but remains in the current state.
- */
- @Override
- public State process(Leader msg) {
- super.process(msg);
- return null;
- }
- /**
- * Discards the message.
- */
- @Override
- public State process(Offline msg) {
- return null;
- }
+ } else {
+ logger.info("ignored old heartbeat message from {} on topic {}", msg.getSource(), getTopic());
+ }
- /**
- * Discards the message.
- */
- @Override
- public State process(Query msg) {
return null;
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index 1e3a907e..421b9225 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -26,7 +26,7 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ScheduledFuture;
+import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
@@ -37,16 +37,19 @@ import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A state in the finite state machine.
* <p>
- * A state may have several timers associated with it, which must be cancelled
- * whenever the state is changed. Assumes that timers are not continuously added
- * to the same state.
+ * A state may have several timers associated with it, which must be cancelled whenever
+ * the state is changed. Assumes that timers are not continuously added to the same state.
*/
public abstract class State {
+ private static final Logger logger = LoggerFactory.getLogger(State.class);
+
/**
* Host pool manager.
*/
@@ -55,7 +58,7 @@ public abstract class State {
/**
* Timers added by this state.
*/
- private final List<ScheduledFuture<?>> timers = new LinkedList<>();
+ private final List<CancellableScheduledTask> timers = new LinkedList<>();
/**
*
@@ -66,9 +69,9 @@ public abstract class State {
}
/**
- * Gets the server-side filter to use when polling the DMaaP internal topic.
- * The default method returns a filter that accepts messages on the admin
- * channel and on the host's own channel.
+ * Gets the server-side filter to use when polling the DMaaP internal topic. The
+ * default method returns a filter that accepts messages on the admin channel and on
+ * the host's own channel.
*
* @return the server-side filter to use.
*/
@@ -80,25 +83,15 @@ public abstract class State {
/**
* Cancels the timers added by this state.
*/
- public void cancelTimers() {
- for (ScheduledFuture<?> fut : timers) {
- fut.cancel(false);
- }
+ public final void cancelTimers() {
+ timers.forEach(timer -> timer.cancel());
}
/**
- * Starts the state.
+ * Starts the state. The default method simply logs a message and returns.
*/
public void start() {
-
- }
-
- /**
- * Indicates that the finite state machine is stopping. Sends an "offline"
- * message to the other hosts.
- */
- public void stop() {
- publish(makeOffline());
+ logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
}
/**
@@ -106,7 +99,7 @@ public abstract class State {
*
* @return the new state
*/
- public State goStart() {
+ public final State goStart() {
return mgr.goStart();
}
@@ -124,7 +117,7 @@ public abstract class State {
*
* @return the new state
*/
- public State goActive() {
+ public final State goActive() {
return mgr.goActive();
}
@@ -138,13 +131,14 @@ public abstract class State {
}
/**
- * Processes a message. The default method passes it to the manager to
- * handle and returns {@code null}.
+ * Processes a message. The default method passes it to the manager to handle and
+ * returns {@code null}.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Forward msg) {
+ logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
mgr.handle(msg);
return null;
}
@@ -156,6 +150,7 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Heartbeat msg) {
+ logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
return null;
}
@@ -166,41 +161,54 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Identification msg) {
+ logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
return null;
}
/**
- * Processes a message. If this host has a new assignment, then it
- * transitions to the active state. Otherwise, it transitions to the
- * inactive state.
+ * Processes a message. The default method copies the assignments and then returns
+ * {@code null}.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Leader msg) {
+ if (isValid(msg)) {
+ logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
+ startDistributing(msg.getAssignments());
+ }
+
+ return null;
+ }
+
+ /**
+ * Determines if a message is valid and did not originate from this host.
+ *
+ * @param msg message to be validated
+ * @return {@code true} if the message is valid, {@code false} otherwise
+ */
+ protected boolean isValid(Leader msg) {
BucketAssignments asgn = msg.getAssignments();
if (asgn == null) {
- return null;
+ logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
+ return false;
}
+ // ignore Leader messages from ourself
String source = msg.getSource();
- if (source == null) {
- return null;
+ if (source == null || source.equals(getHost())) {
+ logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
+ return false;
}
// the new leader must equal the source
- if (source.equals(asgn.getLeader())) {
- startDistributing(asgn);
-
- if (asgn.hasAssignment(getHost())) {
- return goActive();
+ boolean result = source.equals(asgn.getLeader());
- } else {
- return goInactive();
- }
+ if (!result) {
+ logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
}
- return null;
+ return result;
}
/**
@@ -210,6 +218,7 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Offline msg) {
+ logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
return null;
}
@@ -220,6 +229,7 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Query msg) {
+ logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
return null;
}
@@ -228,7 +238,7 @@ public abstract class State {
*
* @param msg message to be published
*/
- protected void publish(Identification msg) {
+ protected final void publish(Identification msg) {
mgr.publishAdmin(msg);
}
@@ -237,7 +247,7 @@ public abstract class State {
*
* @param msg message to be published
*/
- protected void publish(Leader msg) {
+ protected final void publish(Leader msg) {
mgr.publishAdmin(msg);
}
@@ -246,7 +256,7 @@ public abstract class State {
*
* @param msg message to be published
*/
- protected void publish(Offline msg) {
+ protected final void publish(Offline msg) {
mgr.publishAdmin(msg);
}
@@ -255,7 +265,7 @@ public abstract class State {
*
* @param msg message to be published
*/
- protected void publish(Query msg) {
+ protected final void publish(Query msg) {
mgr.publishAdmin(msg);
}
@@ -265,7 +275,7 @@ public abstract class State {
* @param channel
* @param msg message to be published
*/
- protected void publish(String channel, Forward msg) {
+ protected final void publish(String channel, Forward msg) {
mgr.publish(channel, msg);
}
@@ -275,7 +285,7 @@ public abstract class State {
* @param channel
* @param msg message to be published
*/
- protected void publish(String channel, Heartbeat msg) {
+ protected final void publish(String channel, Heartbeat msg) {
mgr.publish(channel, msg);
}
@@ -284,7 +294,7 @@ public abstract class State {
*
* @param assignments
*/
- protected void startDistributing(BucketAssignments assignments) {
+ protected final void startDistributing(BucketAssignments assignments) {
if (assignments != null) {
mgr.startDistributing(assignments);
}
@@ -296,7 +306,7 @@ public abstract class State {
* @param delayMs
* @param task
*/
- protected void schedule(long delayMs, StateTimerTask task) {
+ protected final void schedule(long delayMs, StateTimerTask task) {
timers.add(mgr.schedule(delayMs, task));
}
@@ -307,7 +317,7 @@ public abstract class State {
* @param delayMs
* @param task
*/
- protected void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
}
@@ -316,7 +326,7 @@ public abstract class State {
*
* @return a new {@link InactiveState}
*/
- protected State internalTopicFailed() {
+ protected final State internalTopicFailed() {
publish(makeOffline());
mgr.internalTopicFailed();
@@ -330,16 +340,25 @@ public abstract class State {
*
* @return a new message
*/
- protected Heartbeat makeHeartbeat(long timestampMs) {
+ protected final Heartbeat makeHeartbeat(long timestampMs) {
return new Heartbeat(getHost(), timestampMs);
}
/**
+ * Makes an Identification message.
+ *
+ * @return a new message
+ */
+ protected Identification makeIdentification() {
+ return new Identification(getHost(), getAssignments());
+ }
+
+ /**
* Makes an "offline" message.
*
* @return a new message
*/
- protected Offline makeOffline() {
+ protected final Offline makeOffline() {
return new Offline(getHost());
}
@@ -348,7 +367,7 @@ public abstract class State {
*
* @return a new message
*/
- protected Query makeQuery() {
+ protected final Query makeQuery() {
return new Query(getHost());
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java
index bd388b4e..346d4496 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StateTimerTask.java
@@ -29,9 +29,8 @@ public interface StateTimerTask {
/**
* Fires the timer.
*
- * @param arg always {@code null}
* @return the new state, or {@code null} if the state is unchanged
*/
- public State fire(Void arg);
+ public State fire();
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
index f68f2395..a91671fd 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -21,6 +21,8 @@
package org.onap.policy.drools.pooling;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
@@ -211,11 +213,11 @@ public class DmaapManagerTest {
expectException("startPublisher,start", xxx -> mgr.startPublisher());
expectException("startPublisher,publish", xxx -> mgr.publish(MSG));
-
+
// allow it to succeed this time
reset(sink);
when(sink.send(any())).thenReturn(true);
-
+
mgr.startPublisher();
verify(sink).start();
@@ -227,29 +229,64 @@ public class DmaapManagerTest {
@Test
public void testStopPublisher() throws PoolingFeatureException {
// not publishing yet, so stopping should have no effect
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
verify(sink, never()).stop();
-
+
// now start it
mgr.startPublisher();
-
+
// this time, stop should do something
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
verify(sink).stop();
-
+
// re-stopping should have no effect
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
verify(sink).stop();
}
@Test
+ public void testStopPublisher_WithDelay() throws PoolingFeatureException {
+
+ mgr.startPublisher();
+
+ long tbeg = System.currentTimeMillis();
+
+ mgr.stopPublisher(100L);
+
+ assertTrue(System.currentTimeMillis() >= tbeg + 100L);
+ }
+
+ @Test
+ public void testStopPublisher_WithDelayInterrupted() throws Exception {
+
+ mgr.startPublisher();
+
+ long minms = 2000L;
+
+ // tell the publisher to stop in minms + additional time
+ Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L));
+ thread.start();
+
+ // give the thread a chance to start
+ Thread.sleep(50L);
+
+ // interrupt it - it should immediately finish its work
+ thread.interrupt();
+
+ // wait for it to stop, but only wait the minimum time
+ thread.join(minms);
+
+ assertFalse(thread.isAlive());
+ }
+
+ @Test
public void testStopPublisher_Exception() throws PoolingFeatureException {
mgr.startPublisher();
-
+
// force exception when it stops
doThrow(new IllegalStateException("expected")).when(sink).stop();
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
}
@Test
@@ -270,14 +307,14 @@ public class DmaapManagerTest {
// not consuming yet, so stopping should have no effect
mgr.stopConsumer(listener);
verify(source, never()).unregister(any());
-
+
// now start it
mgr.startConsumer(listener);
-
+
// this time, stop should do something
mgr.stopConsumer(listener);
verify(source).unregister(listener);
-
+
// re-stopping should have no effect
mgr.stopConsumer(listener);
verify(source).unregister(listener);
@@ -292,7 +329,7 @@ public class DmaapManagerTest {
public void testSetFilter_Exception() throws PoolingFeatureException {
// force an error when setFilter() is called
doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any());
-
+
mgr.setFilter(FILTER);
}
@@ -300,41 +337,41 @@ public class DmaapManagerTest {
public void testPublish() throws PoolingFeatureException {
// cannot publish before starting
expectException("publish,pre", xxx -> mgr.publish(MSG));
-
+
mgr.startPublisher();
-
+
// publish several messages
mgr.publish(MSG);
verify(sink).send(MSG);
-
- mgr.publish(MSG+"a");
- verify(sink).send(MSG+"a");
-
- mgr.publish(MSG+"b");
- verify(sink).send(MSG+"b");
-
+
+ mgr.publish(MSG + "a");
+ verify(sink).send(MSG + "a");
+
+ mgr.publish(MSG + "b");
+ verify(sink).send(MSG + "b");
+
// stop and verify we can no longer publish
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
expectException("publish,stopped", xxx -> mgr.publish(MSG));
}
@Test(expected = PoolingFeatureException.class)
public void testPublish_SendFailed() throws PoolingFeatureException {
mgr.startPublisher();
-
+
// arrange for send() to fail
when(sink.send(MSG)).thenReturn(false);
-
+
mgr.publish(MSG);
}
@Test(expected = PoolingFeatureException.class)
public void testPublish_SendEx() throws PoolingFeatureException {
mgr.startPublisher();
-
+
// arrange for send() to throw an exception
doThrow(new IllegalStateException("expected")).when(sink).send(MSG);
-
+
mgr.publish(MSG);
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java
deleted file mode 100644
index 5f918f73..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureEnabledCheckerTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.generalize;
-import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
-import java.util.Properties;
-import org.junit.Test;
-
-public class FeatureEnabledCheckerTest {
-
- private static final String PROP_NAME = "enable.{?.}it";
-
- private static final String SPEC = "my.specializer";
-
- @Test
- public void test() {
- assertFalse(check(null, null));
- assertTrue(check(null, true));
- assertFalse(check(null, false));
-
- assertTrue(check(true, null));
- assertTrue(check(true, true));
- assertFalse(check(true, false));
-
- assertFalse(check(false, null));
- assertTrue(check(false, true));
- assertFalse(check(false, false));
- }
-
- /**
- * Adds properties, as specified, and checks if the feature is enabled.
- *
- * @param wantGen value to assign to the generalized property, or
- * {@code null} to leave it unset
- * @param wantSpec value to assign to the specialized property, or
- * {@code null} to leave it unset
- * @return {@code true} if the feature is enabled, {@code false} otherwise
- */
- public boolean check(Boolean wantGen, Boolean wantSpec) {
- Properties props = new Properties();
-
- if (wantGen != null) {
- props.setProperty(generalize(PROP_NAME), wantGen.toString());
- }
-
- if (wantSpec != null) {
- props.setProperty(specialize(PROP_NAME, SPEC), wantSpec.toString());
- }
-
- return FeatureEnabledChecker.isFeatureEnabled(props, SPEC, PROP_NAME);
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
new file mode 100644
index 00000000..13d70f52
--- /dev/null
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -0,0 +1,1158 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.FilterableTopicSource;
+import org.onap.policy.drools.event.comm.Topic;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
+ * its own feature object.
+ */
+public class FeatureTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
+
+ /**
+ * Name of the topic used for inter-host communication.
+ */
+ private static final String INTERNAL_TOPIC = "my.internal.topic";
+
+ /**
+ * Name of the topic from which "external" events "arrive".
+ */
+ private static final String EXTERNAL_TOPIC = "my.external.topic";
+
+ /**
+ * Name of the controller.
+ */
+ private static final String CONTROLLER1 = "controller.one";
+
+ // private static final long STD_HEARTBEAT_WAIT_MS = 100;
+ // private static final long STD_REACTIVATE_WAIT_MS = 200;
+ // private static final long STD_IDENTIFICATION_MS = 60;
+ // private static final long STD_ACTIVE_HEARTBEAT_MS = 5;
+ // private static final long STD_INTER_HEARTBEAT_MS = 50;
+ // private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
+ // private static final long POLL_MS = 2;
+ // private static final long INTER_POLL_MS = 2;
+ // private static final long EVENT_WAIT_SEC = 5;
+
+ // use these to slow things down
+ private static final long STD_HEARTBEAT_WAIT_MS = 5000;
+ private static final long STD_REACTIVATE_WAIT_MS = 10000;
+ private static final long STD_IDENTIFICATION_MS = 10000;
+ private static final long STD_ACTIVE_HEARTBEAT_MS = 5000;
+ private static final long STD_INTER_HEARTBEAT_MS = 12000;
+ private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
+ private static final long POLL_MS = 2;
+ private static final long INTER_POLL_MS = 2000;
+ private static final long EVENT_WAIT_SEC = 1000;
+
+ // these are saved and restored on exit from this test class
+ private static PoolingFeature.Factory saveFeatureFactory;
+ private static PoolingManagerImpl.Factory saveManagerFactory;
+ private static DmaapManager.Factory saveDmaapFactory;
+
+ /**
+ * Context for the current test case.
+ */
+ private Context ctx;
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveFeatureFactory = PoolingFeature.getFactory();
+ saveManagerFactory = PoolingManagerImpl.getFactory();
+ saveDmaapFactory = DmaapManager.getFactory();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ PoolingFeature.setFactory(saveFeatureFactory);
+ PoolingManagerImpl.setFactory(saveManagerFactory);
+ DmaapManager.setFactory(saveDmaapFactory);
+ }
+
+ @Before
+ public void setUp() {
+ ctx = null;
+ }
+
+ @After
+ public void tearDown() {
+ if (ctx != null) {
+ ctx.destroy();
+ }
+ }
+
+ @Ignore
+ @Test
+ public void test_SingleHost() throws Exception {
+ int nmessages = 70;
+
+ ctx = new Context(nmessages);
+
+ ctx.addHost();
+ ctx.startHosts();
+
+ for (int x = 0; x < nmessages; ++x) {
+ ctx.offerExternal(makeMessage(x));
+ }
+
+ ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(0, ctx.getDecodeErrors());
+ assertEquals(0, ctx.getRemainingEvents());
+ ctx.checkAllSawAMsg();
+ }
+
+ @Ignore
+ @Test
+ public void test_TwoHosts() throws Exception {
+ int nmessages = 200;
+
+ ctx = new Context(nmessages);
+
+ ctx.addHost();
+ ctx.addHost();
+ ctx.startHosts();
+
+ for (int x = 0; x < nmessages; ++x) {
+ ctx.offerExternal(makeMessage(x));
+ }
+
+ // wait for all hosts to have time to process a few messages
+ Thread.sleep(STD_ACTIVE_HEARTBEAT_MS + INTER_POLL_MS * 3);
+
+ // pause a topic for a bit
+// ctx.pauseTopic();
+
+ // now we'll see if it recovers
+
+ ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(0, ctx.getDecodeErrors());
+ assertEquals(0, ctx.getRemainingEvents());
+ ctx.checkAllSawAMsg();
+ }
+
+ private String makeMessage(int reqnum) {
+ return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
+ }
+
+ /**
+ * Context used for a single test case.
+ */
+ private static class Context {
+
+ private final FeatureFactory featureFactory;
+ private final ManagerFactory managerFactory;
+ private final DmaapFactory dmaapFactory;
+
+ /**
+ * Hosts that have been added to this context.
+ */
+ private final Deque<Host> hosts = new LinkedList<>();
+
+ /**
+ * Maps a drools controller to its policy controller.
+ */
+ private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
+
+ /**
+ * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
+ */
+ private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
+
+ /**
+ * Queue for the external "DMaaP" topic.
+ */
+ private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
+
+ /**
+ * Counts the number of decode errors.
+ */
+ private final AtomicInteger nDecodeErrors = new AtomicInteger(0);
+
+ /**
+ * Number of events we're still waiting to receive.
+ */
+ private final CountDownLatch eventCounter;
+
+ /**
+ * Maps host name to its topic source. This must be in sorted order so we can
+ * identify the source for the host with the higher name.
+ */
+ private TreeMap<String, TopicSourceImpl> host2topic = new TreeMap<>();
+
+ /**
+ * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
+ * {@link #getCurrentHost()}.
+ */
+ private Host currentHost = null;
+
+ /**
+ *
+ * @param nEvents number of events to be processed
+ */
+ public Context(int nEvents) {
+ featureFactory = new FeatureFactory(this);
+ managerFactory = new ManagerFactory(this);
+ dmaapFactory = new DmaapFactory(this);
+ eventCounter = new CountDownLatch(nEvents);
+
+ PoolingFeature.setFactory(featureFactory);
+ PoolingManagerImpl.setFactory(managerFactory);
+ DmaapManager.setFactory(dmaapFactory);
+ }
+
+ /**
+ * Destroys the context, stopping any hosts that remain.
+ */
+ public void destroy() {
+ stopHosts();
+ hosts.clear();
+ }
+
+ /**
+ * Creates and adds a new host to the context.
+ */
+ public void addHost() {
+ hosts.add(new Host(this));
+ }
+
+ /**
+ * Starts the hosts.
+ */
+ public void startHosts() {
+ hosts.forEach(host -> host.start());
+ }
+
+ /**
+ * Stops the hosts.
+ */
+ public void stopHosts() {
+ hosts.forEach(host -> host.stop());
+ }
+
+ /**
+ * Verifies that all hosts processed at least one message.
+ */
+ public void checkAllSawAMsg() {
+ int x = 0;
+ for (Host host : hosts) {
+ assertTrue("x=" + x, host.messageSeen());
+ ++x;
+ }
+ }
+
+ /**
+ * Sets {@link #currentHost} to the specified host, and then invokes the given
+ * function. Resets {@link #currentHost} to {@code null} before returning.
+ *
+ * @param host
+ * @param func function to invoke
+ */
+ public void withHost(Host host, VoidFunction func) {
+ currentHost = host;
+ func.apply();
+ currentHost = null;
+ }
+
+ /**
+ * Offers an event to the external topic.
+ *
+ * @param event
+ */
+ public void offerExternal(String event) {
+ externalTopic.offer(event);
+ }
+
+ /**
+ * Adds an internal channel to the set of channels.
+ *
+ * @param channel
+ * @param queue the channel's queue
+ */
+ public void addInternal(String channel, BlockingQueue<String> queue) {
+ channel2queue.put(channel, queue);
+ }
+
+ /**
+ * Offers a message to all internal channels.
+ *
+ * @param message
+ */
+ public void offerInternal(String message) {
+ channel2queue.values().forEach(queue -> queue.offer(message));
+ }
+
+ /**
+ * Offers amessage to an internal channel.
+ *
+ * @param channel
+ * @param message
+ */
+ public void offerInternal(String channel, String message) {
+ BlockingQueue<String> queue = channel2queue.get(channel);
+ if (queue != null) {
+ queue.offer(message);
+ }
+ }
+
+ /**
+ * Decodes an event.
+ *
+ * @param event
+ * @return the decoded event, or {@code null} if it cannot be decoded
+ */
+ public Object decodeEvent(String event) {
+ return managerFactory.decodeEvent(null, null, event);
+ }
+
+ /**
+ * Associates a controller with its drools controller.
+ *
+ * @param controller
+ * @param droolsController
+ */
+ public void addController(PolicyController controller, DroolsController droolsController) {
+ drools2policy.put(droolsController, controller);
+ }
+
+ /**
+ * @param droolsController
+ * @return the controller associated with a drools controller, or {@code null} if
+ * it has no associated controller
+ */
+ public PolicyController getController(DroolsController droolsController) {
+ return drools2policy.get(droolsController);
+ }
+
+ /**
+ * @return queue for the external topic
+ */
+ public BlockingQueue<String> getExternalTopic() {
+ return externalTopic;
+ }
+
+ /**
+ *
+ * @return the number of decode errors so far
+ */
+ public int getDecodeErrors() {
+ return nDecodeErrors.get();
+ }
+
+ /**
+ * Increments the count of decode errors.
+ */
+ public void bumpDecodeErrors() {
+ nDecodeErrors.incrementAndGet();
+ }
+
+ /**
+ *
+ * @return the number of events that haven't been processed
+ */
+ public long getRemainingEvents() {
+ return eventCounter.getCount();
+ }
+
+ /**
+ * Adds an event to the counter.
+ */
+ public void addEvent() {
+ eventCounter.countDown();
+ }
+
+ /**
+ * Waits, for a period of time, for all events to be processed.
+ *
+ * @param time
+ * @param units
+ * @return {@code true} if all events have been processed, {@code false} otherwise
+ * @throws InterruptedException
+ */
+ public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
+ return eventCounter.await(time, units);
+ }
+
+ /**
+ * Associates a host with a topic.
+ *
+ * @param host
+ * @param topic
+ */
+ public void addTopicSource(String host, TopicSourceImpl topic) {
+ host2topic.put(host, topic);
+ }
+
+ /**
+ * Pauses the last topic source long enough to miss a heart beat.
+ */
+ public void pauseTopic() {
+ Entry<String, TopicSourceImpl> ent = host2topic.lastEntry();
+ if (ent != null) {
+ ent.getValue().pause(STD_ACTIVE_HEARTBEAT_MS);
+ }
+ }
+
+ /**
+ * Gets the current host, provided this is used from within a call to
+ * {@link #withHost(Host, VoidFunction)}.
+ *
+ * @return the current host, or {@code null} if there is no current host
+ */
+ public Host getCurrentHost() {
+ return currentHost;
+ }
+ }
+
+ /**
+ * Simulates a single "host".
+ */
+ private static class Host {
+
+ private final Context context;
+
+ private final PoolingFeature feature = new PoolingFeature();
+
+ /**
+ * {@code True} if this host has processed a message, {@code false} otherwise.
+ */
+ private final AtomicBoolean sawMsg = new AtomicBoolean(false);
+
+ /**
+ * This host's internal "DMaaP" topic.
+ */
+ private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
+
+ /**
+ * Source that reads from the external topic and posts to the listener.
+ */
+ private TopicSource externalSource;
+
+ // mock objects
+ private final PolicyEngine engine = mock(PolicyEngine.class);
+ private final ListenerController controller = mock(ListenerController.class);
+ private final DroolsController drools = mock(DroolsController.class);
+
+ /**
+ *
+ * @param context
+ */
+ public Host(Context context) {
+ this.context = context;
+
+ when(controller.getName()).thenReturn(CONTROLLER1);
+ when(controller.getDrools()).thenReturn(drools);
+
+ // stop consuming events if the controller stops
+ when(controller.stop()).thenAnswer(args -> {
+ externalSource.unregister(controller);
+ return true;
+ });
+
+ doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
+
+ context.addController(controller, drools);
+
+ // arrange to read from the external topic
+ externalSource = new TopicSourceImpl(context, false);
+ }
+
+ /**
+ * Gets the host name. This should only be invoked within {@link #start()}.
+ *
+ * @return the host name
+ */
+ public String getName() {
+ return PoolingManagerImpl.getLastHost();
+ }
+
+ /**
+ * Starts threads for the host so that it begins consuming from both the external
+ * "DMaaP" topic and its own internal "DMaaP" topic.
+ */
+ public void start() {
+
+ context.withHost(this, () -> {
+
+ feature.beforeStart(engine);
+ feature.afterCreate(controller);
+
+ // assign the queue for this host's internal topic
+ context.addInternal(getName(), msgQueue);
+
+ feature.beforeStart(controller);
+
+ // start consuming events from the external topic
+ externalSource.register(controller);
+
+ feature.afterStart(controller);
+ });
+ }
+
+ /**
+ * Stops the host's threads.
+ */
+ public void stop() {
+ feature.beforeStop(controller);
+ externalSource.unregister(controller);
+ feature.afterStop(controller);
+ }
+
+ /**
+ * Offers an event to the feature, before the policy controller handles it.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ return feature.beforeOffer(controller, protocol, topic2, event);
+ }
+
+ /**
+ * Offers an event to the feature, after the policy controller handles it.
+ *
+ * @param protocol
+ * @param topic
+ * @param event
+ * @param success
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
+
+ return feature.afterOffer(controller, protocol, topic, event, success);
+ }
+
+ /**
+ * Offers an event to the feature, before the drools controller handles it.
+ *
+ * @param fact
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean beforeInsert(Object fact) {
+ return feature.beforeInsert(drools, fact);
+ }
+
+ /**
+ * Offers an event to the feature, after the drools controller handles it.
+ *
+ * @param fact
+ * @param successInsert {@code true} if it was successfully inserted by the drools
+ * controller, {@code false} otherwise
+ * @return {@code true} if the event was handled, {@code false} otherwise
+ */
+ public boolean afterInsert(Object fact, boolean successInsert) {
+ return feature.afterInsert(drools, fact, successInsert);
+ }
+
+ /**
+ * Indicates that a message was seen for this host.
+ */
+ public void sawMessage() {
+ sawMsg.set(true);
+ }
+
+ /**
+ *
+ * @return {@code true} if a message was seen for this host, {@code false}
+ * otherwise
+ */
+ public boolean messageSeen() {
+ return sawMsg.get();
+ }
+
+ /**
+ * @return the queue associated with this host's internal topic
+ */
+ public BlockingQueue<String> getInternalQueue() {
+ return msgQueue;
+ }
+ }
+
+ /**
+ * Listener for the external topic. Simulates the actions taken by
+ * <i>AggregatedPolicyController.onTopicEvent</i>.
+ */
+ private static class MyExternalTopicListener implements Answer<Void> {
+
+ private final Context context;
+ private final Host host;
+
+ public MyExternalTopicListener(Context context, Host host) {
+ this.context = context;
+ this.host = host;
+ }
+
+ @Override
+ public Void answer(InvocationOnMock args) throws Throwable {
+ int i = 0;
+ CommInfrastructure commType = args.getArgument(i++);
+ String topic = args.getArgument(i++);
+ String event = args.getArgument(i++);
+
+ if (host.beforeOffer(commType, topic, event)) {
+ return null;
+ }
+
+ boolean result;
+ Object fact = context.decodeEvent(event);
+
+ if (fact == null) {
+ result = false;
+ context.bumpDecodeErrors();
+
+ } else {
+ result = true;
+
+ if (!host.beforeInsert(fact)) {
+ // feature did not handle it so we handle it here
+ host.afterInsert(fact, result);
+
+ host.sawMessage();
+ context.addEvent();
+ }
+ }
+
+ host.afterOffer(commType, topic, event, result);
+ return null;
+ }
+ }
+
+ /**
+ * Sink implementation that puts a message on the queue specified by the
+ * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
+ * message is placed on all queues.
+ */
+ private static class TopicSinkImpl extends TopicImpl implements TopicSink {
+
+ private final Context context;
+
+ /**
+ * Used to decode the messages so that the channel can be extracted.
+ */
+ private final Serializer serializer = new Serializer();
+
+ /**
+ *
+ * @param context
+ */
+ public TopicSinkImpl(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public synchronized boolean send(String message) {
+ if (!isAlive()) {
+ return false;
+ }
+
+ try {
+ Message msg = serializer.decodeMsg(message);
+ String channel = msg.getChannel();
+
+ if (Message.ADMIN.equals(channel)) {
+ // add to every queue
+ context.offerInternal(message);
+
+ } else {
+ // add to a specific queue
+ context.offerInternal(channel, message);
+ }
+
+ return true;
+
+ } catch (IOException e) {
+ logger.warn("could not decode message: {}", message);
+ context.bumpDecodeErrors();
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Source implementation that reads from a queue associated with a topic.
+ */
+ private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
+
+ private final String topic;
+
+ /**
+ * Queue from which to retrieve messages.
+ */
+ private final BlockingQueue<String> queue;
+
+ /**
+ * Manages the current consumer thread. The "first" item is used as a trigger to
+ * tell the thread to stop processing, while the "second" item is triggered <i>by
+ * the thread</i> when it completes.
+ */
+ private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
+
+ /**
+ * Time, in milliseconds, to pause before polling for more messages.
+ */
+ private AtomicLong pauseTimeMs = new AtomicLong(0);
+
+ /**
+ *
+ * @param context
+ * @param internal {@code true} if to read from the internal topic, {@code false}
+ * to read from the external topic
+ */
+ public TopicSourceImpl(Context context, boolean internal) {
+ if (internal) {
+ Host host = context.getCurrentHost();
+
+ this.topic = INTERNAL_TOPIC;
+ this.queue = host.getInternalQueue();
+
+ context.addTopicSource(host.getName(), this);
+
+ } else {
+ this.topic = EXTERNAL_TOPIC;
+ this.queue = context.getExternalTopic();
+ }
+ }
+
+ @Override
+ public void setFilter(String filter) {
+ logger.info("topic filter set to: {}", filter);
+ }
+
+ @Override
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public boolean offer(String event) {
+ throw new UnsupportedOperationException("offer topic source");
+ }
+
+ /**
+ * Starts a thread that takes messages from the queue and gives them to the
+ * listener. Stops the thread of any previously registered listener.
+ */
+ @Override
+ public void register(TopicListener listener) {
+ Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1));
+
+ reregister(newPair);
+
+ new Thread(() -> {
+ try {
+ do {
+ processMessages(newPair.first(), listener);
+ } while (!newPair.first().await(INTER_POLL_MS, TimeUnit.MILLISECONDS));
+
+ logger.info("topic source thread completed");
+
+ } catch (InterruptedException e) {
+ logger.warn("topic source thread aborted", e);
+ Thread.currentThread().interrupt();
+
+ } catch (RuntimeException e) {
+ logger.warn("topic source thread aborted", e);
+ }
+
+ newPair.second().countDown();
+
+ }).start();
+ }
+
+ /**
+ * Stops the thread of <i>any</i> currently registered listener.
+ */
+ @Override
+ public void unregister(TopicListener listener) {
+ reregister(null);
+ }
+
+ /**
+ * Registers a new "pair" with this source, stopping the consumer associated with
+ * any previous registration.
+ *
+ * @param newPair the new "pair", or {@code null} to unregister
+ */
+ private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
+ try {
+ Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
+ if (oldPair == null) {
+ if (newPair == null) {
+ // unregister was invoked twice in a row
+ logger.warn("re-unregister for topic source");
+ }
+
+ // no previous thread to stop
+ return;
+ }
+
+ // need to stop the previous thread
+
+ // tell it to stop
+ oldPair.first().countDown();
+
+ // wait for it to stop
+ if (!oldPair.second().await(2, TimeUnit.SECONDS)) {
+ logger.warn("old topic registration is still running");
+ }
+
+ } catch (InterruptedException e) {
+ logger.warn("old topic registration may still be running", e);
+ Thread.currentThread().interrupt();
+ }
+
+ if (newPair != null) {
+ // register was invoked twice in a row
+ logger.warn("re-register for topic source");
+ }
+ }
+
+ /**
+ * Indicates that {@link #processMessages(CountDownLatch, TopicListener)} should
+ * pause a bit.
+ *
+ * @param timeMs time, in milliseconds, to pause
+ */
+ public void pause(long timeMs) {
+ pauseTimeMs.set(timeMs);
+ }
+
+ /**
+ * Polls for messages from the topic and offers them to the listener. If
+ * {@link #pauseTimeMs} is non-zero, then it pauses for the specified time and
+ * then immediately returns.
+ *
+ * @param stopped triggered if processing should stop
+ * @param listener
+ * @throws InterruptedException
+ */
+ private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
+
+ for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
+
+ long ptm = pauseTimeMs.getAndSet(0);
+ if (ptm != 0) {
+ logger.warn("pause processing");
+ stopped.await(ptm, TimeUnit.MILLISECONDS);
+ return;
+ }
+
+ String msg = queue.poll(POLL_MS, TimeUnit.MILLISECONDS);
+ if (msg == null) {
+ return;
+ }
+
+ listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
+ }
+ }
+ }
+
+ /**
+ * Topic implementation. Most methods just throw
+ * {@link UnsupportedOperationException}.
+ */
+ private static class TopicImpl implements Topic {
+
+ /**
+ * {@code True} if this topic is alive/running, {@code false} otherwise.
+ */
+ private boolean alive = false;
+
+ /**
+ *
+ */
+ public TopicImpl() {
+ super();
+ }
+
+ @Override
+ public String getTopic() {
+ return INTERNAL_TOPIC;
+ }
+
+ @Override
+ public CommInfrastructure getTopicCommInfrastructure() {
+ throw new UnsupportedOperationException("topic protocol");
+ }
+
+ @Override
+ public List<String> getServers() {
+ throw new UnsupportedOperationException("topic servers");
+ }
+
+ @Override
+ public String[] getRecentEvents() {
+ throw new UnsupportedOperationException("topic events");
+ }
+
+ @Override
+ public void register(TopicListener topicListener) {
+ throw new UnsupportedOperationException("register topic");
+ }
+
+ @Override
+ public void unregister(TopicListener topicListener) {
+ throw new UnsupportedOperationException("unregister topic");
+ }
+
+ @Override
+ public synchronized boolean start() {
+ if (alive) {
+ throw new IllegalStateException("topic already started");
+ }
+
+ alive = true;
+ return true;
+ }
+
+ @Override
+ public synchronized boolean stop() {
+ if (!alive) {
+ throw new IllegalStateException("topic is not running");
+ }
+
+ alive = false;
+ return true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ alive = false;
+ }
+
+ @Override
+ public synchronized boolean isAlive() {
+ return alive;
+ }
+
+ @Override
+ public boolean lock() {
+ throw new UnsupportedOperationException("lock topicink");
+ }
+
+ @Override
+ public boolean unlock() {
+ throw new UnsupportedOperationException("unlock topic");
+ }
+
+ @Override
+ public boolean isLocked() {
+ throw new UnsupportedOperationException("topic isLocked");
+ }
+ }
+
+ /**
+ * Simulator for the feature-level factory.
+ */
+ private static class FeatureFactory extends PoolingFeature.Factory {
+
+ private final Context context;
+
+ /**
+ *
+ * @param context
+ */
+ public FeatureFactory(Context context) {
+ this.context = context;
+
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it
+ * hasn't been fully initialized yet
+ */
+ }
+
+ @Override
+ public Properties getProperties(String featName) {
+ Properties props = new Properties();
+
+ props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
+
+ props.setProperty("pooling." + CONTROLLER1 + ".topic", INTERNAL_TOPIC);
+ props.setProperty("pooling." + CONTROLLER1 + ".enabled", "true");
+ props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.limit", "10000");
+ props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.age.milliseconds", "1000000");
+ props.setProperty("pooling." + CONTROLLER1 + ".start.heartbeat.milliseconds", "" + STD_HEARTBEAT_WAIT_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".reactivate.milliseconds", "" + STD_REACTIVATE_WAIT_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".identification.milliseconds", "" + STD_IDENTIFICATION_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".active.heartbeat.milliseconds",
+ "" + STD_ACTIVE_HEARTBEAT_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".inter.heartbeat.milliseconds", "" + STD_INTER_HEARTBEAT_MS);
+ props.setProperty("pooling." + CONTROLLER1 + ".offline.publish.wait.milliseconds",
+ "" + STD_OFFLINE_PUB_WAIT_MS);
+
+ return props;
+ }
+
+ @Override
+ public PolicyController getController(DroolsController droolsController) {
+ return context.getController(droolsController);
+ }
+ }
+
+ /**
+ * Simulator for the pooling manager factory.
+ */
+ private static class ManagerFactory extends PoolingManagerImpl.Factory {
+
+ /**
+ * Used to decode events from the external topic.
+ */
+ private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
+ @Override
+ protected ObjectMapper initialValue() {
+ return new ObjectMapper();
+ }
+ };
+
+ /**
+ * Used to decode events into a Map.
+ */
+ private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {};
+
+ /**
+ *
+ * @param context
+ */
+ public ManagerFactory(Context context) {
+
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it
+ * hasn't been fully initialized yet
+ */
+ }
+
+ @Override
+ public boolean canDecodeEvent(DroolsController drools, String topic) {
+ return true;
+ }
+
+ @Override
+ public Object decodeEvent(DroolsController drools, String topic, String event) {
+ try {
+ return mapper.get().readValue(event, typeRef);
+
+ } catch (IOException e) {
+ logger.warn("cannot decode external event", e);
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Simulator for the dmaap manager factory.
+ */
+ private static class DmaapFactory extends DmaapManager.Factory {
+
+ private final Context context;
+
+ /**
+ *
+ * @param context
+ */
+ public DmaapFactory(Context context) {
+ this.context = context;
+
+ /*
+ * Note: do NOT extract anything from "context" at this point, because it
+ * hasn't been fully initialized yet
+ */
+ }
+
+ @Override
+ public List<TopicSource> initTopicSources(Properties props) {
+ return Arrays.asList(new TopicSourceImpl(context, true));
+ }
+
+ @Override
+ public List<TopicSink> initTopicSinks(Properties props) {
+ return Arrays.asList(new TopicSinkImpl(context));
+ }
+ }
+
+ /**
+ * Controller that also implements the {@link TopicListener} interface.
+ */
+ private static interface ListenerController extends PolicyController, TopicListener {
+
+ }
+
+ /**
+ * Simple function that takes no arguments and returns nothing.
+ */
+ @FunctionalInterface
+ private static interface VoidFunction {
+
+ public void apply();
+ }
+}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java
index 5b423d4b..34b604c9 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureExceptionTest.java
@@ -31,12 +31,4 @@ public class PoolingFeatureExceptionTest extends ExceptionsTester {
assertEquals(5, test(PoolingFeatureException.class));
}
- @Test
- public void testToRuntimeException() {
- PoolingFeatureException plainExc = new PoolingFeatureException("hello");
- PoolingFeatureRtException runtimeExc = plainExc.toRuntimeException();
-
- assertTrue(plainExc == runtimeExc.getCause());
- }
-
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index cd1aea09..7782e475 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -23,7 +23,7 @@ package org.onap.policy.drools.pooling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -40,12 +41,11 @@ import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.pooling.PoolingFeature.Factory;
import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngine;
import org.onap.policy.drools.utils.Pair;
public class PoolingFeatureTest {
- private static final String CONFIG_DIR = "src/test/java/org/onap/policy/drools/pooling";
-
private static final String CONTROLLER1 = "controllerA";
private static final String CONTROLLER2 = "controllerB";
private static final String CONTROLLER_DISABLED = "controllerDisabled";
@@ -66,6 +66,8 @@ public class PoolingFeatureTest {
*/
private static Factory saveFactory;
+ private Properties props;
+ private PolicyEngine engine;
private PolicyController controller1;
private PolicyController controller2;
private PolicyController controllerDisabled;
@@ -94,6 +96,8 @@ public class PoolingFeatureTest {
@Before
public void setUp() throws Exception {
+ props = initProperties();
+ engine = mock(PolicyEngine.class);
factory = mock(Factory.class);
controller1 = mock(PolicyController.class);
controller2 = mock(PolicyController.class);
@@ -113,12 +117,13 @@ public class PoolingFeatureTest {
when(controllerException.getName()).thenReturn(CONTROLLER_EX);
when(controllerUnknown.getName()).thenReturn(CONTROLLER_UNKNOWN);
+ when(factory.getProperties(PoolingProperties.FEATURE_NAME)).thenReturn(props);
when(factory.getController(drools1)).thenReturn(controller1);
when(factory.getController(drools2)).thenReturn(controller2);
when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled);
when(factory.makeManager(any(), any())).thenAnswer(args -> {
- PoolingProperties props = args.getArgumentAt(1, PoolingProperties.class);
+ PoolingProperties props = args.getArgument(1);
PoolingManagerImpl mgr = mock(PoolingManagerImpl.class);
@@ -129,7 +134,7 @@ public class PoolingFeatureTest {
pool = new PoolingFeature();
- pool.globalInit(null, CONFIG_DIR);
+ pool.beforeStart(engine);
pool.afterCreate(controller1);
pool.afterCreate(controller2);
@@ -149,24 +154,17 @@ public class PoolingFeatureTest {
}
@Test
- public void testGlobalInit() {
- pool = new PoolingFeature();
-
- pool.globalInit(null, CONFIG_DIR);
- }
-
- @Test(expected = PoolingFeatureRtException.class)
- public void testGlobalInit_NotFound() {
+ public void testBeforeStartEngine() {
pool = new PoolingFeature();
- pool.globalInit(null, CONFIG_DIR + "/unknown");
+ assertFalse(pool.beforeStart(engine));
}
@Test
public void testAfterCreate() {
managers.clear();
pool = new PoolingFeature();
- pool.globalInit(null, CONFIG_DIR);
+ pool.beforeStart(engine);
assertFalse(pool.afterCreate(controller1));
assertEquals(1, managers.size());
@@ -184,7 +182,7 @@ public class PoolingFeatureTest {
public void testAfterCreate_NotEnabled() {
managers.clear();
pool = new PoolingFeature();
- pool.globalInit(null, CONFIG_DIR);
+ pool.beforeStart(engine);
assertFalse(pool.afterCreate(controllerDisabled));
assertTrue(managers.isEmpty());
@@ -194,7 +192,7 @@ public class PoolingFeatureTest {
public void testAfterCreate_PropertyEx() {
managers.clear();
pool = new PoolingFeature();
- pool.globalInit(null, CONFIG_DIR);
+ pool.beforeStart(engine);
pool.afterCreate(controllerException);
}
@@ -202,9 +200,9 @@ public class PoolingFeatureTest {
@Test(expected = PoolingFeatureRtException.class)
public void testAfterCreate_NoProps() {
pool = new PoolingFeature();
-
+
// did not perform globalInit, which is an error
-
+
pool.afterCreate(controller1);
}
@@ -212,7 +210,7 @@ public class PoolingFeatureTest {
public void testAfterCreate_NoFeatProps() {
managers.clear();
pool = new PoolingFeature();
- pool.globalInit(null, CONFIG_DIR);
+ pool.beforeStart(engine);
assertFalse(pool.afterCreate(controllerUnknown));
assertTrue(managers.isEmpty());
@@ -492,4 +490,31 @@ public class PoolingFeatureTest {
pool.afterStop(controller1);
}
+ private Properties initProperties() {
+ Properties props = new Properties();
+
+ initProperties(props, "A", 0);
+ initProperties(props, "B", 1);
+ initProperties(props, "Exception", 2);
+
+ props.setProperty("pooling.controllerDisabled.enabled", "false");
+
+ props.setProperty("pooling.controllerException.offline.queue.limit", "INVALID NUMBER");
+
+ return props;
+ }
+
+ private void initProperties(Properties props, String suffix, int offset) {
+ props.setProperty("pooling.controller" + suffix + ".topic", "topic." + suffix);
+ props.setProperty("pooling.controller" + suffix + ".enabled", "true");
+ props.setProperty("pooling.controller" + suffix + ".offline.queue.limit", String.valueOf(5 + offset));
+ props.setProperty("pooling.controller" + suffix + ".offline.queue.age.milliseconds",
+ String.valueOf(100 + offset));
+ props.setProperty("pooling.controller" + suffix + ".start.heartbeat.milliseconds", String.valueOf(10 + offset));
+ props.setProperty("pooling.controller" + suffix + ".reactivate.milliseconds", String.valueOf(20 + offset));
+ props.setProperty("pooling.controller" + suffix + ".identification.milliseconds", String.valueOf(30 + offset));
+ props.setProperty("pooling.controller" + suffix + ".active.heartbeat.milliseconds",
+ String.valueOf(40 + offset));
+ props.setProperty("pooling.controller" + suffix + ".inter.heartbeat.milliseconds", String.valueOf(50 + offset));
+ }
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index 01ee61ef..e32fa545 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -25,7 +25,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -40,7 +41,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -71,6 +71,7 @@ public class PoolingManagerImplTest {
protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
+ protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
private static final String HOST2 = "other.host";
@@ -85,8 +86,8 @@ public class PoolingManagerImplTest {
private static final String REQUEST_ID = "my.request.id";
/**
- * Number of dmaap.publish() invocations that should be issued when the
- * manager is started.
+ * Number of dmaap.publish() invocations that should be issued when the manager is
+ * started.
*/
private static final int START_PUB = 1;
@@ -135,6 +136,7 @@ public class PoolingManagerImplTest {
when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
+ when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
futures = new LinkedList<>();
ser = new Serializer();
@@ -180,11 +182,6 @@ public class PoolingManagerImplTest {
mgr = new PoolingManagerImpl(controller, poolProps);
}
- @After
- public void tearDown() throws Exception {
-
- }
-
@Test
public void testPoolingManagerImpl() {
mgr = new PoolingManagerImpl(controller, poolProps);
@@ -199,8 +196,8 @@ public class PoolingManagerImplTest {
@Test
public void testPoolingManagerImpl_ClassEx() {
/*
- * this controller does not implement TopicListener, which should cause
- * a ClassCastException
+ * this controller does not implement TopicListener, which should cause a
+ * ClassCastException
*/
PolicyController ctlr = mock(PolicyController.class);
@@ -316,6 +313,7 @@ public class PoolingManagerImplTest {
verify(dmaap).stopConsumer(mgr);
verify(sched).shutdownNow();
+ verify(dmaap).publish(contains("offline"));
assertTrue(mgr.getCurrent() instanceof IdleState);
}
@@ -362,7 +360,7 @@ public class PoolingManagerImplTest {
mgr.afterStop();
verify(eventQueue).clear();
- verify(dmaap).stopPublisher();
+ verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
}
@Test
@@ -376,7 +374,7 @@ public class PoolingManagerImplTest {
mgr.afterStop();
verify(eventQueue, never()).clear();
- verify(dmaap).stopPublisher();
+ verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
}
@Test
@@ -511,7 +509,7 @@ public class PoolingManagerImplTest {
CountDownLatch latch = new CountDownLatch(1);
- mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
latch.countDown();
return null;
});
@@ -539,7 +537,7 @@ public class PoolingManagerImplTest {
CountDownLatch latch = new CountDownLatch(1);
- mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
latch.countDown();
return null;
});
@@ -608,8 +606,7 @@ public class PoolingManagerImplTest {
StartState st = (StartState) mgr.getCurrent();
/*
- * give it its heart beat, that should cause it to transition to the
- * Query state.
+ * give it its heart beat, that should cause it to transition to the Query state.
*/
Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
hb.setChannel(Message.ADMIN);
@@ -985,14 +982,35 @@ public class PoolingManagerImplTest {
}
@Test
+ public void testInject_Ex() throws Exception {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ // generate RuntimeException when onTopicEvent() is invoked
+ doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
+
+ CountDownLatch latch = catchRecursion(true);
+
+ Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+ mgr.handle(msg);
+
+ verify(dmaap, times(START_PUB)).publish(any());
+ verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+
+ // ensure we made it past both beforeXxx() methods
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
public void testHandleInternal() throws Exception {
startMgr();
StartState st = (StartState) mgr.getCurrent();
/*
- * give it its heart beat, that should cause it to transition to the
- * Query state.
+ * give it its heart beat, that should cause it to transition to the Query state.
*/
Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
hb.setChannel(Message.ADMIN);
@@ -1022,8 +1040,8 @@ public class PoolingManagerImplTest {
Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
/*
- * do NOT set the channel - this will cause the message to be invalid,
- * triggering an exception
+ * do NOT set the channel - this will cause the message to be invalid, triggering
+ * an exception
*/
String msg = ser.encodeMsg(hb);
@@ -1068,7 +1086,7 @@ public class PoolingManagerImplTest {
when(eventQueue.poll()).thenAnswer(args -> lst.poll());
// route the messages to this host
- mgr.startDistributing(makeAssignments(true));
+ assertTrue(mgr.startDistributing(makeAssignments(true)).await(2, TimeUnit.SECONDS));
// all of the events should have been processed locally
verify(dmaap, times(START_PUB)).publish(any());
@@ -1088,7 +1106,7 @@ public class PoolingManagerImplTest {
when(eventQueue.poll()).thenAnswer(args -> lst.poll());
// route the messages to the OTHER host
- mgr.startDistributing(makeAssignments(false));
+ assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS));
// all of the events should have been forwarded
verify(dmaap, times(4)).publish(any());
@@ -1140,7 +1158,7 @@ public class PoolingManagerImplTest {
CountDownLatch latch = new CountDownLatch(1);
- mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
latch.countDown();
return null;
});
@@ -1163,7 +1181,7 @@ public class PoolingManagerImplTest {
CountDownLatch latch = new CountDownLatch(1);
- mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
latch.countDown();
return null;
});
@@ -1208,15 +1226,14 @@ public class PoolingManagerImplTest {
}
/**
- * Configure the mock controller to act like a real controller, invoking
- * beforeOffer and then beforeInsert, so we can make sure they pass through.
- * We'll keep count to ensure we don't get into infinite recursion.
+ * Configure the mock controller to act like a real controller, invoking beforeOffer
+ * and then beforeInsert, so we can make sure they pass through. We'll keep count to
+ * ensure we don't get into infinite recursion.
*
- * @param invokeBeforeInsert {@code true} if beforeInsert() should be
- * invoked, {@code false} if it should be skipped
+ * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked,
+ * {@code false} if it should be skipped
*
- * @return a latch that will be counted down if both beforeXxx() methods
- * return false
+ * @return a latch that will be counted down if both beforeXxx() methods return false
*/
private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
CountDownLatch recursion = new CountDownLatch(3);
@@ -1230,9 +1247,9 @@ public class PoolingManagerImplTest {
}
int iarg = 0;
- CommInfrastructure proto = args.getArgumentAt(iarg++, CommInfrastructure.class);
- String topic = args.getArgumentAt(iarg++, String.class);
- String event = args.getArgumentAt(iarg++, String.class);
+ CommInfrastructure proto = args.getArgument(iarg++);
+ String topic = args.getArgument(iarg++);
+ String event = args.getArgument(iarg++);
if (mgr.beforeOffer(proto, topic, event)) {
return null;
@@ -1253,9 +1270,8 @@ public class PoolingManagerImplTest {
/**
* Makes an assignment with two buckets.
*
- * @param sameHost {@code true} if the {@link #REQUEST_ID} should has to the
- * manager's bucket, {@code false} if it should hash to the other
- * host's bucket
+ * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the
+ * manager's bucket, {@code false} if it should hash to the other host's bucket
* @return a new bucket assignment
*/
private BucketAssignments makeAssignments(boolean sameHost) {
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
index 63eb59d4..2d734c1c 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
@@ -30,6 +30,7 @@ import static org.onap.policy.drools.pooling.PoolingProperties.IDENTIFICATION_MS
import static org.onap.policy.drools.pooling.PoolingProperties.INTER_HEARTBEAT_MS;
import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_AGE_MS;
import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_LIMIT;
+import static org.onap.policy.drools.pooling.PoolingProperties.OFFLINE_PUB_WAIT_MS;
import static org.onap.policy.drools.pooling.PoolingProperties.POOLING_TOPIC;
import static org.onap.policy.drools.pooling.PoolingProperties.REACTIVATE_MS;
import static org.onap.policy.drools.pooling.PoolingProperties.START_HEARTBEAT_MS;
@@ -53,6 +54,7 @@ public class PoolingPropertiesTest {
public static final long STD_LEADER_MS = 5000L;
public static final long STD_ACTIVE_HEARTBEAT_MS = 6000L;
public static final long STD_INTER_HEARTBEAT_MS = 7000L;
+ public static final long STD_OFFLINE_PUB_WAIT_MS = 8000L;
private Properties plain;
private PoolingProperties pooling;
@@ -121,10 +123,15 @@ public class PoolingPropertiesTest {
doTest(INTER_HEARTBEAT_MS, STD_INTER_HEARTBEAT_MS, 15000L, xxx -> pooling.getInterHeartbeatMs());
}
+ @Test
+ public void testGetOfflinePubWaitMs() throws PropertyException {
+ doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs());
+ }
+
/**
- * Tests a particular property. Verifies that the correct value is returned
- * if the specialized property has a value or the property has no value.
- * Also verifies that the property name can be generalized.
+ * Tests a particular property. Verifies that the correct value is returned if the
+ * specialized property has a value or the property has no value. Also verifies that
+ * the property name can be generalized.
*
* @param propnm name of the property of interest
* @param specValue expected specialized value
@@ -140,8 +147,8 @@ public class PoolingPropertiesTest {
assertEquals("special " + propnm, specValue, func.apply(null));
/*
- * Ensure the property supports generalization - this will throw an
- * exception if it does not.
+ * Ensure the property supports generalization - this will throw an exception if
+ * it does not.
*/
assertFalse(propnm.equals(generalize(propnm)));
@@ -155,8 +162,8 @@ public class PoolingPropertiesTest {
}
/**
- * Makes a set of properties, where all of the properties are specialized
- * for the controller.
+ * Makes a set of properties, where all of the properties are specialized for the
+ * controller.
*
* @return a new property set
*/
@@ -172,6 +179,7 @@ public class PoolingPropertiesTest {
props.setProperty(specialize(IDENTIFICATION_MS, CONTROLLER), "" + STD_IDENTIFICATION_MS);
props.setProperty(specialize(ACTIVE_HEARTBEAT_MS, CONTROLLER), "" + STD_ACTIVE_HEARTBEAT_MS);
props.setProperty(specialize(INTER_HEARTBEAT_MS, CONTROLLER), "" + STD_INTER_HEARTBEAT_MS);
+ props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS);
return props;
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
index 8b495099..505dc400 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
@@ -49,8 +49,8 @@ public class SpecPropertiesTest {
private static final String PREFIX_SPEC = PREFIX_GEN + MY_SPEC + ".";
/**
- * Suffix to add to property names to generate names of properties that are
- * not populated.
+ * Suffix to add to property names to generate names of properties that are not
+ * populated.
*/
private static final String SUFFIX = ".suffix";
@@ -175,6 +175,16 @@ public class SpecPropertiesTest {
assertNull(props.getProperty(gen(PROP_UNKNOWN), null));
}
+ @Test(expected = UnsupportedOperationException.class)
+ public void testHashCode() {
+ props.hashCode();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testEquals() {
+ props.equals(props);
+ }
+
private String gen(String propnm) {
if (propnm.startsWith(PREFIX_SPEC)) {
return PREFIX_GEN + propnm.substring(PREFIX_SPEC.length());
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties
index a4b5bc76..3273a21e 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/feature-pooling-dmaap.properties
@@ -1,3 +1,16 @@
+# Copyright 2018 AT&T Intellectual Property. All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
pooling.controllerA.topic = topic.A
pooling.controllerA.enabled = true
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java
index ef03d4d6..c14e8dba 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/BucketAssignmentsTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.Arrays;
import java.util.SortedSet;
import java.util.TreeSet;
import org.junit.Test;
@@ -196,16 +197,21 @@ public class BucketAssignmentsTest {
String[] arr = {"def", "abc", "def", "ghi", "def", "def", "xyz"};
asgn.setHostArray(arr);
- for (int x = 0; x < arr.length; ++x) {
- assertEquals("x=" + x, arr[x], asgn.getAssignedHost(x));
+ /*
+ * get assignments for consecutive integers, including negative numbers and
+ * numbers extending past the length of the array.
+ *
+ */
+ TreeSet<String> seen = new TreeSet<>();
+ for (int x = -1; x < arr.length + 2; ++x) {
+ seen.add(asgn.getAssignedHost(x));
}
- // negative
- assertNull(asgn.getAssignedHost(-1));
+ TreeSet<String> expected = new TreeSet<>(Arrays.asList(arr));
+ assertEquals(expected, seen);
- // beyond end
- assertNull(asgn.getAssignedHost(arr.length));
- assertNull(asgn.getAssignedHost(arr.length + 1));
+ // try a much bigger number
+ assertNotNull(asgn.getAssignedHost(arr.length * 1000));
}
@Test
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
index 7997a4ee..7b4b0602 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -39,11 +39,12 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Heartbeat;
-import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
+import org.onap.policy.drools.utils.Pair;
+import org.onap.policy.drools.utils.Triple;
public class ActiveStateTest extends BasicStateTester {
@@ -65,7 +66,7 @@ public class ActiveStateTest extends BasicStateTester {
// ensure a heart beat was generated
Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
- assertEquals(MY_HOST, msg.second.getSource());
+ assertEquals(MY_HOST, msg.second().getSource());
}
@Test
@@ -183,8 +184,8 @@ public class ActiveStateTest extends BasicStateTester {
/*
*
- * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader
- * thus should be ignored.
+ * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader thus
+ * should be ignored.
*/
assertNull(state.process(new Offline(PREV_HOST2)));
}
@@ -196,22 +197,57 @@ public class ActiveStateTest extends BasicStateTester {
state = new ActiveState(mgr);
/*
- * HOST1 has buckets, but it isn't the leader and it isn't my
- * predecessor, thus should be ignored.
+ * HOST1 has buckets, but it isn't the leader and it isn't my predecessor, thus
+ * should be ignored.
*/
assertNull(state.process(new Offline(HOST1)));
}
@Test
- public void testProcessQuery() {
+ public void testProcessLeader_Invalid() {
+ Leader msg = new Leader(PREV_HOST, null);
+
+ // should stay in the same state, and not start distributing
+ assertNull(state.process(msg));
+ verify(mgr, never()).startDistributing(any());
+ verify(mgr, never()).goActive();
+ verify(mgr, never()).goInactive();
+
+ // info should be unchanged
+ assertEquals(MY_HOST, state.getLeader());
+ assertEquals(ASGN3, state.getAssignments());
+ }
+
+ @Test
+ public void testProcessLeader_BadLeader() {
+ String[] arr = {HOST2, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ // now send a Leader message for that leader
+ Leader msg = new Leader(HOST1, asgn);
+
State next = mock(State.class);
when(mgr.goQuery()).thenReturn(next);
- assertEquals(next, state.process(new Query()));
+ // should go Query, but not start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr, never()).startDistributing(asgn);
+ }
+
+ @Test
+ public void testProcessLeader_GoodLeader() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ // now send a Leader message for that leader
+ Leader msg = new Leader(PREV_HOST, asgn);
+
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
- Identification ident = captureAdminMessage(Identification.class);
- assertEquals(MY_HOST, ident.getSource());
- assertEquals(ASGN3, ident.getAssignments());
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
}
@Test
@@ -256,24 +292,24 @@ public class ActiveStateTest extends BasicStateTester {
// invoke start() to add the timers
state.start();
- assertEquals(3, repeatedFutures.size());
+ assertEquals(3, repeatedSchedules.size());
Triple<Long, Long, StateTimerTask> timer;
// heart beat generator
timer = repeatedTasks.remove();
- assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue());
- assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
// my heart beat checker
timer = repeatedTasks.remove();
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue());
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
// predecessor's heart beat checker
timer = repeatedTasks.remove();
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue());
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
}
@Test
@@ -285,19 +321,19 @@ public class ActiveStateTest extends BasicStateTester {
// invoke start() to add the timers
state.start();
- assertEquals(2, repeatedFutures.size());
+ assertEquals(2, repeatedSchedules.size());
Triple<Long, Long, StateTimerTask> timer;
// heart beat generator
timer = repeatedTasks.remove();
- assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first.longValue());
- assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second.longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
// my heart beat checker
timer = repeatedTasks.remove();
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.first.longValue());
- assertEquals(STD_INTER_HEARTBEAT_MS, timer.second.longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
+ assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
}
@Test
@@ -314,14 +350,14 @@ public class ActiveStateTest extends BasicStateTester {
verify(mgr).publish(anyString(), any(Heartbeat.class));
// fire the task
- assertNull(task.third.fire(null));
+ assertNull(task.third().fire());
// should have generated a second pair of heart beats
verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class));
Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
- assertEquals(MY_HOST, msg.first);
- assertEquals(MY_HOST, msg.second.getSource());
+ assertEquals(MY_HOST, msg.first());
+ assertEquals(MY_HOST, msg.second().getSource());
}
@Test
@@ -339,7 +375,7 @@ public class ActiveStateTest extends BasicStateTester {
when(mgr.goInactive()).thenReturn(next);
// fire the task - should not transition
- assertNull(task.third.fire(null));
+ assertNull(task.third().fire());
verify(mgr, never()).publishAdmin(any(Query.class));
}
@@ -356,7 +392,7 @@ public class ActiveStateTest extends BasicStateTester {
when(mgr.goInactive()).thenReturn(next);
// fire the task - should transition
- assertEquals(next, task.third.fire(null));
+ assertEquals(next, task.third().fire());
// should indicate failure
verify(mgr).internalTopicFailed();
@@ -381,7 +417,7 @@ public class ActiveStateTest extends BasicStateTester {
when(mgr.goQuery()).thenReturn(next);
// fire the task - should NOT transition
- assertNull(task.third.fire(null));
+ assertNull(task.third().fire());
verify(mgr, never()).publishAdmin(any(Query.class));
}
@@ -398,7 +434,7 @@ public class ActiveStateTest extends BasicStateTester {
when(mgr.goQuery()).thenReturn(next);
// fire the task - should transition
- assertEquals(next, task.third.fire(null));
+ assertEquals(next, task.third().fire());
verify(mgr).publishAdmin(any(Query.class));
}
@@ -414,8 +450,8 @@ public class ActiveStateTest extends BasicStateTester {
verify(mgr, times(1)).publish(any(), any());
Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
- assertEquals(MY_HOST, msg.first);
- assertEquals(MY_HOST, msg.second.getSource());
+ assertEquals(MY_HOST, msg.first());
+ assertEquals(MY_HOST, msg.second().getSource());
}
@Test
@@ -429,13 +465,13 @@ public class ActiveStateTest extends BasicStateTester {
// this message should go to itself
msg = capturePublishedMessage(Heartbeat.class, index++);
- assertEquals(MY_HOST, msg.first);
- assertEquals(MY_HOST, msg.second.getSource());
+ assertEquals(MY_HOST, msg.first());
+ assertEquals(MY_HOST, msg.second().getSource());
// this message should go to its successor
msg = capturePublishedMessage(Heartbeat.class, index++);
- assertEquals(HOST1, msg.first);
- assertEquals(MY_HOST, msg.second.getSource());
+ assertEquals(HOST1, msg.first());
+ assertEquals(MY_HOST, msg.second().getSource());
}
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java
index e48742f7..75ca7564 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/BasicStateTester.java
@@ -21,9 +21,9 @@
package org.onap.policy.drools.pooling.state;
import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -33,16 +33,18 @@ import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReference;
+import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.utils.Pair;
+import org.onap.policy.drools.utils.Triple;
/**
- * Superclass used to test subclasses of {@link Message}.
+ * Superclass used to test subclasses of {@link State}.
*/
public class BasicStateTester {
@@ -74,9 +76,9 @@ public class BasicStateTester {
protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
/**
- * Futures returned by schedule().
+ * Scheduled tasks returned by schedule().
*/
- protected LinkedList<ScheduledFuture<?>> onceFutures;
+ protected LinkedList<CancellableScheduledTask> onceSchedules;
/**
* Tasks captured via schedule().
@@ -84,9 +86,9 @@ public class BasicStateTester {
protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
/**
- * Futures returned by scheduleWithFixedDelay().
+ * Scheduled tasks returned by scheduleWithFixedDelay().
*/
- protected LinkedList<ScheduledFuture<?>> repeatedFutures;
+ protected LinkedList<CancellableScheduledTask> repeatedSchedules;
/**
* Tasks captured via scheduleWithFixedDelay().
@@ -112,10 +114,10 @@ public class BasicStateTester {
}
public void setUp() throws Exception {
- onceFutures = new LinkedList<>();
+ onceSchedules = new LinkedList<>();
onceTasks = new LinkedList<>();
- repeatedFutures = new LinkedList<>();
+ repeatedSchedules = new LinkedList<>();
repeatedTasks = new LinkedList<>();
published = new LinkedList<>();
@@ -162,9 +164,9 @@ public class BasicStateTester {
Object[] args = invocation.getArguments();
onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1]));
- ScheduledFuture<?> fut = mock(ScheduledFuture.class);
- onceFutures.add(fut);
- return fut;
+ CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
+ onceSchedules.add(sched);
+ return sched;
});
// capture scheduleWithFixedDelay() arguments, and return a new future
@@ -172,9 +174,9 @@ public class BasicStateTester {
Object[] args = invocation.getArguments();
repeatedTasks.add(new Triple<>((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
- ScheduledFuture<?> fut = mock(ScheduledFuture.class);
- repeatedFutures.add(fut);
- return fut;
+ CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
+ repeatedSchedules.add(sched);
+ return sched;
});
// get/set assignments in the manager
@@ -183,7 +185,7 @@ public class BasicStateTester {
when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
doAnswer(args -> {
- asgn.set(args.getArgumentAt(0, BucketAssignments.class));
+ asgn.set(args.getArgument(0));
return null;
}).when(mgr).startDistributing(any());
}
@@ -199,8 +201,7 @@ public class BasicStateTester {
}
/**
- * Captures the host array from the Leader message published to the admin
- * channel.
+ * Captures the host array from the Leader message published to the admin channel.
*
* @return the host array, as a list
*/
@@ -209,8 +210,7 @@ public class BasicStateTester {
}
/**
- * Captures the host array from the Leader message published to the admin
- * channel.
+ * Captures the host array from the Leader message published to the admin channel.
*
* @return the host array
*/
@@ -224,8 +224,7 @@ public class BasicStateTester {
}
/**
- * Captures the assignments from the Leader message published to the admin
- * channel.
+ * Captures the assignments from the Leader message published to the admin channel.
*
* @return the bucket assignments
*/
@@ -277,42 +276,6 @@ public class BasicStateTester {
*/
protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
Pair<String, Message> msg = published.get(index);
- return new Pair<>(msg.first, clazz.cast(msg.second));
+ return new Pair<>(msg.first(), clazz.cast(msg.second()));
}
-
- /**
- * Pair of values.
- *
- * @param <F> first value's type
- * @param <S> second value's type
- */
- public static class Pair<F, S> {
- public final F first;
- public final S second;
-
- public Pair(F first, S second) {
- this.first = first;
- this.second = second;
- }
- }
-
- /**
- * Pair of values.
- *
- * @param <F> first value's type
- * @param <S> second value's type
- * @param <T> third value's type
- */
- public static class Triple<F, S, T> {
- public final F first;
- public final S second;
- public final T third;
-
- public Triple(F first, S second, T third) {
- this.first = first;
- this.second = second;
- this.third = third;
- }
- }
-
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
index 96c59719..95cbe753 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
@@ -61,12 +61,6 @@ public class IdleStateTest extends BasicStateTester {
}
@Test
- public void testStop() {
- state.stop();
- verifyNothingPublished();
- }
-
- @Test
public void testProcessForward() {
Forward msg = new Forward();
assertNull(state.process(msg));
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
index 48d5b1ed..394adaee 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.utils.Pair;
public class InactiveStateTest extends BasicStateTester {
@@ -62,20 +63,20 @@ public class InactiveStateTest extends BasicStateTester {
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_REACTIVATE_WAIT_MS, timer.first.longValue());
+ assertEquals(STD_REACTIVATE_WAIT_MS, timer.first().longValue());
// invoke the task - it should go to the state returned by the mgr
State next = mock(State.class);
when(mgr.goStart()).thenReturn(next);
- assertEquals(next, timer.second.fire(null));
+ assertEquals(next, timer.second().fire());
}
@Test
public void testInactiveState() {
/*
- * Prove the state is attached to the manager by invoking getHost(),
- * which delegates to the manager.
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
*/
assertEquals(MY_HOST, state.getHost());
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
index d60ad2ea..e1718418 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -38,16 +38,19 @@ import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Query;
+import org.onap.policy.drools.pooling.state.ProcessingState.HostBucket;
public class ProcessingStateTest extends BasicStateTester {
private ProcessingState state;
+ private HostBucket hostBucket;
@Before
public void setUp() throws Exception {
super.setUp();
state = new ProcessingState(mgr, MY_HOST);
+ hostBucket = new HostBucket(MY_HOST);
}
@Test
@@ -62,6 +65,39 @@ public class ProcessingStateTest extends BasicStateTester {
}
@Test
+ public void testGoActive_WithAssignment() {
+ State act = mock(State.class);
+ State inact = mock(State.class);
+
+ when(mgr.goActive()).thenReturn(act);
+ when(mgr.goInactive()).thenReturn(inact);
+
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertEquals(act, state.goActive(asgn));
+
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ public void testGoActive_WithoutAssignment() {
+ State act = mock(State.class);
+ State inact = mock(State.class);
+
+ when(mgr.goActive()).thenReturn(act);
+ when(mgr.goInactive()).thenReturn(inact);
+
+ String[] arr = {HOST2, PREV_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+
+ assertEquals(inact, state.goActive(asgn));
+
+ verify(mgr).startDistributing(asgn);
+
+ }
+
+ @Test
public void testProcessQuery() {
State next = mock(State.class);
when(mgr.goQuery()).thenReturn(next);
@@ -97,8 +133,8 @@ public class ProcessingStateTest extends BasicStateTester {
state = new ProcessingState(mgr, LEADER);
/*
- * Prove the state is attached to the manager by invoking getHost(),
- * which delegates to the manager.
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
*/
assertEquals(MY_HOST, state.getHost());
@@ -260,8 +296,8 @@ public class ProcessingStateTest extends BasicStateTester {
@Test
public void testMakeBucketArray() {
/*
- * All hosts are still alive, so it should have the exact same
- * assignments as it had to start.
+ * All hosts are still alive, so it should have the exact same assignments as it
+ * had to start.
*/
state.setAssignments(ASGN3);
state.becomeLeader(sortHosts(HOST_ARR3));
@@ -325,4 +361,80 @@ public class ProcessingStateTest extends BasicStateTester {
assertEquals(Arrays.asList(expected), captureHostList());
}
+ @Test
+ public void testHostBucketRemove_testHostBucketAdd_testHostBucketSize() {
+ assertEquals(0, hostBucket.size());
+
+ hostBucket.add(20);
+ hostBucket.add(30);
+ hostBucket.add(40);
+ assertEquals(3, hostBucket.size());
+
+ assertEquals(20, hostBucket.remove().intValue());
+ assertEquals(30, hostBucket.remove().intValue());
+ assertEquals(1, hostBucket.size());
+
+ // add more before taking the last item
+ hostBucket.add(50);
+ hostBucket.add(60);
+ assertEquals(3, hostBucket.size());
+
+ assertEquals(40, hostBucket.remove().intValue());
+ assertEquals(50, hostBucket.remove().intValue());
+ assertEquals(60, hostBucket.remove().intValue());
+ assertEquals(0, hostBucket.size());
+
+ // add more AFTER taking the last item
+ hostBucket.add(70);
+ assertEquals(70, hostBucket.remove().intValue());
+ assertEquals(0, hostBucket.size());
+ }
+
+ @Test
+ public void testHostBucketCompareTo() {
+ HostBucket hb1 = new HostBucket(PREV_HOST);
+ HostBucket hb2 = new HostBucket(MY_HOST);
+
+ assertEquals(0, hb1.compareTo(hb1));
+ assertEquals(0, hb1.compareTo(new HostBucket(PREV_HOST)));
+
+ // both empty
+ assertTrue(hb1.compareTo(hb2) < 0);
+ assertTrue(hb2.compareTo(hb1) > 0);
+
+ // hb1 has one bucket, so it should not be larger
+ hb1.add(100);
+ assertTrue(hb1.compareTo(hb2) > 0);
+ assertTrue(hb2.compareTo(hb1) < 0);
+
+ // hb1 has two buckets, so it should still be larger
+ hb1.add(200);
+ assertTrue(hb1.compareTo(hb2) > 0);
+ assertTrue(hb2.compareTo(hb1) < 0);
+
+ // hb1 has two buckets, hb2 has one, so hb1 should still be larger
+ hb2.add(1000);
+ assertTrue(hb1.compareTo(hb2) > 0);
+ assertTrue(hb2.compareTo(hb1) < 0);
+
+ // same number of buckets, so hb2 should now be larger
+ hb2.add(2000);
+ assertTrue(hb1.compareTo(hb2) < 0);
+ assertTrue(hb2.compareTo(hb1) > 0);
+
+ // hb2 has more buckets, it should still be larger
+ hb2.add(3000);
+ assertTrue(hb1.compareTo(hb2) < 0);
+ assertTrue(hb2.compareTo(hb1) > 0);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testHostBucketHashCode() {
+ hostBucket.hashCode();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testHostBucketEquals() {
+ hostBucket.equals(hostBucket);
+ }
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
index d714d5cc..a7c3a3d5 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -25,9 +25,10 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
@@ -38,7 +39,7 @@ import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
-import org.onap.policy.drools.pooling.message.Query;
+import org.onap.policy.drools.utils.Pair;
public class QueryStateTest extends BasicStateTester {
@@ -63,44 +64,48 @@ public class QueryStateTest extends BasicStateTester {
}
@Test
+ public void testGoQuery() {
+ assertNull(state.goQuery());
+ }
+
+ @Test
public void testStart() {
state.start();
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
- assertNotNull(timer.second);
+ assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
+ assertNotNull(timer.second());
}
@Test
- public void testGoQuery() {
- assertNull(state.process(new Query()));
- assertEquals(ASGN3, state.getAssignments());
- }
+ public void testProcessIdentification_SameSource() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
- @Test
- public void testProcessIdentification_NullSource() {
- assertNull(state.process(new Identification()));
+ assertNull(state.process(new Identification(MY_HOST, asgn)));
+ // info should be unchanged
assertEquals(MY_HOST, state.getLeader());
+ verify(mgr, never()).startDistributing(asgn);
}
@Test
- public void testProcessIdentification_NewLeader() {
- assertNull(state.process(new Identification(PREV_HOST, null)));
-
- assertEquals(PREV_HOST, state.getLeader());
- }
+ public void testProcessIdentification_DiffSource() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
- @Test
- public void testProcessIdentification_NotNewLeader() {
- assertNull(state.process(new Identification(HOST2, null)));
+ assertNull(state.process(new Identification(HOST2, asgn)));
+ // leader should be unchanged
assertEquals(MY_HOST, state.getLeader());
+
+ // should have picked up the assignments
+ verify(mgr).startDistributing(asgn);
}
@Test
- public void testProcessLeader_NullAssignment() {
+ public void testProcessLeader_Invalid() {
Leader msg = new Leader(PREV_HOST, null);
// should stay in the same state, and not start distributing
@@ -115,67 +120,55 @@ public class QueryStateTest extends BasicStateTester {
}
@Test
- public void testProcessLeader_NullSource() {
+ public void testProcessLeader_SameLeader() {
String[] arr = {HOST2, PREV_HOST, MY_HOST};
BucketAssignments asgn = new BucketAssignments(arr);
- Leader msg = new Leader(null, asgn);
- // should stay in the same state, and not start distributing
- assertNull(state.process(msg));
- verify(mgr, never()).startDistributing(any());
- verify(mgr, never()).goActive();
- verify(mgr, never()).goInactive();
+ // identify a leader that's better than my host
+ assertEquals(null, state.process(new Identification(PREV_HOST, asgn)));
- // info should be unchanged
- assertEquals(MY_HOST, state.getLeader());
- assertEquals(ASGN3, state.getAssignments());
- }
+ // now send a Leader message for that leader
+ Leader msg = new Leader(PREV_HOST, asgn);
- @Test
- public void testProcessLeader_SourceIsNotAssignmentLeader() {
- String[] arr = {HOST2, PREV_HOST, MY_HOST};
- BucketAssignments asgn = new BucketAssignments(arr);
- Leader msg = new Leader(HOST2, asgn);
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
- // should stay in the same state, and not start distributing
- assertNull(state.process(msg));
- verify(mgr, never()).startDistributing(any());
- verify(mgr, never()).goActive();
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
verify(mgr, never()).goInactive();
- // info should be unchanged
- assertEquals(MY_HOST, state.getLeader());
- assertEquals(ASGN3, state.getAssignments());
+ // Ident msg + Leader msg = times(2)
+ verify(mgr, times(2)).startDistributing(asgn);
}
@Test
- public void testProcessLeader_EmptyAssignment() {
- Leader msg = new Leader(PREV_HOST, new BucketAssignments());
+ public void testProcessLeader_BetterLeaderWithAssignment() {
+ String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(PREV_HOST, asgn);
- // should stay in the same state, and not start distributing
- assertNull(state.process(msg));
- verify(mgr, never()).startDistributing(any());
- verify(mgr, never()).goActive();
- verify(mgr, never()).goInactive();
+ State next = mock(State.class);
+ when(mgr.goActive()).thenReturn(next);
- // info should be unchanged
- assertEquals(MY_HOST, state.getLeader());
- assertEquals(ASGN3, state.getAssignments());
+ // should go Active and start distributing
+ assertEquals(next, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ verify(mgr, never()).goInactive();
}
@Test
- public void testProcessLeader_BetterLeader() {
- String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ public void testProcessLeader_BetterLeaderWithoutAssignment() {
+ String[] arr = {HOST2, PREV_HOST, HOST1};
BucketAssignments asgn = new BucketAssignments(arr);
Leader msg = new Leader(PREV_HOST, asgn);
State next = mock(State.class);
- when(mgr.goActive()).thenReturn(next);
+ when(mgr.goInactive()).thenReturn(next);
- // should go Active and start distributing
+ // should go Inactive, but start distributing
assertEquals(next, state.process(msg));
verify(mgr).startDistributing(asgn);
- verify(mgr, never()).goInactive();
+ verify(mgr, never()).goActive();
}
@Test
@@ -241,41 +234,48 @@ public class QueryStateTest extends BasicStateTester {
}
@Test
- public void testProcessQuery() {
- BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
- mgr.startDistributing(asgn);
- state = new QueryState(mgr);
-
- State next = mock(State.class);
- when(mgr.goQuery()).thenReturn(next);
-
- assertEquals(null, state.process(new Query()));
-
- verify(mgr).publishAdmin(any(Identification.class));
- }
-
- @Test
public void testQueryState() {
/*
- * Prove the state is attached to the manager by invoking getHost(),
- * which delegates to the manager.
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
*/
assertEquals(MY_HOST, state.getHost());
}
@Test
+ public void testAwaitIdentification_MissingSelfIdent() {
+ state.start();
+
+ Pair<Long, StateTimerTask> timer = onceTasks.remove();
+
+ assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
+ assertNotNull(timer.second());
+
+ // should published an Offline message and go inactive
+
+ State next = mock(State.class);
+ when(mgr.goInactive()).thenReturn(next);
+
+ assertEquals(next, timer.second().fire());
+
+ Offline msg = captureAdminMessage(Offline.class);
+ assertEquals(MY_HOST, msg.getSource());
+ }
+
+ @Test
public void testAwaitIdentification_Leader() {
state.start();
+ state.process(new Identification(MY_HOST, null));
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
- assertNotNull(timer.second);
+ assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
+ assertNotNull(timer.second());
State next = mock(State.class);
when(mgr.goActive()).thenReturn(next);
- assertEquals(next, timer.second.fire(null));
+ assertEquals(next, timer.second().fire());
// should have published a Leader message
Leader msg = captureAdminMessage(Leader.class);
@@ -291,20 +291,21 @@ public class QueryStateTest extends BasicStateTester {
state = new QueryState(mgr);
state.start();
+ state.process(new Identification(MY_HOST, null));
// tell it the leader is still active
state.process(new Identification(PREV_HOST, asgn));
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
- assertNotNull(timer.second);
+ assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
+ assertNotNull(timer.second());
// set up active state, as that's what it should return
State next = mock(State.class);
when(mgr.goActive()).thenReturn(next);
- assertEquals(next, timer.second.fire(null));
+ assertEquals(next, timer.second().fire());
// should NOT have published a Leader message
assertTrue(admin.isEmpty());
@@ -321,20 +322,21 @@ public class QueryStateTest extends BasicStateTester {
state = new QueryState(mgr);
state.start();
+ state.process(new Identification(MY_HOST, null));
// tell it the leader is still active
state.process(new Identification(PREV_HOST, asgn));
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_IDENTIFICATION_MS, timer.first.longValue());
- assertNotNull(timer.second);
+ assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
+ assertNotNull(timer.second());
// set up inactive state, as that's what it should return
State next = mock(State.class);
when(mgr.goInactive()).thenReturn(next);
- assertEquals(next, timer.second.fire(null));
+ assertEquals(next, timer.second().fire());
// should NOT have published a Leader message
assertTrue(admin.isEmpty());
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
index f29d2348..af4e8f13 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -23,7 +23,7 @@ package org.onap.policy.drools.pooling.state;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -38,6 +38,7 @@ import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
+import org.onap.policy.drools.utils.Pair;
public class StartStateTest extends BasicStateTester {
@@ -74,18 +75,18 @@ public class StartStateTest extends BasicStateTester {
Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
- assertEquals(MY_HOST, msg.first);
- assertEquals(state.getHbTimestampMs(), msg.second.getTimestampMs());
+ assertEquals(MY_HOST, msg.first());
+ assertEquals(state.getHbTimestampMs(), msg.second().getTimestampMs());
Pair<Long, StateTimerTask> timer = onceTasks.removeFirst();
- assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first.longValue());
+ assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first().longValue());
// invoke the task - it should go to the state returned by the mgr
State next = mock(State.class);
when(mgr.goInactive()).thenReturn(next);
- assertEquals(next, timer.second.fire(null));
+ assertEquals(next, timer.second().fire());
verify(mgr).internalTopicFailed();
}
@@ -93,8 +94,8 @@ public class StartStateTest extends BasicStateTester {
@Test
public void testStartStatePoolingManager() {
/*
- * Prove the state is attached to the manager by invoking getHost(),
- * which delegates to the manager.
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
*/
assertEquals(MY_HOST, state.getHost());
}
@@ -105,8 +106,8 @@ public class StartStateTest extends BasicStateTester {
state = new StartState(mgr);
/*
- * Prove the state is attached to the manager by invoking getHost(),
- * which delegates to the manager.
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
*/
assertEquals(MY_HOST, state.getHost());
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
index 1be48e21..a184dfad 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -21,16 +21,18 @@
package org.onap.policy.drools.pooling.state;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
-import java.util.concurrent.ScheduledFuture;
import org.junit.Before;
import org.junit.Test;
+import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Forward;
@@ -55,8 +57,8 @@ public class StateTest extends BasicStateTester {
@Test
public void testStatePoolingManager() {
/*
- * Prove the state is attached to the manager by invoking getHost(),
- * which delegates to the manager.
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
*/
assertEquals(MY_HOST, state.getHost());
}
@@ -67,8 +69,8 @@ public class StateTest extends BasicStateTester {
state = new MyState(mgr);
/*
- * Prove the state is attached to the manager by invoking getHost(),
- * which delegates to the manager.
+ * Prove the state is attached to the manager by invoking getHost(), which
+ * delegates to the manager.
*/
assertEquals(MY_HOST, state.getHost());
}
@@ -98,13 +100,13 @@ public class StateTest extends BasicStateTester {
verify(mgr).schedule(delay, task2);
verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3);
- ScheduledFuture<?> fut1 = onceFutures.removeFirst();
- ScheduledFuture<?> fut2 = onceFutures.removeFirst();
- ScheduledFuture<?> fut3 = repeatedFutures.removeFirst();
+ CancellableScheduledTask sched1 = onceSchedules.removeFirst();
+ CancellableScheduledTask sched2 = onceSchedules.removeFirst();
+ CancellableScheduledTask sched3 = repeatedSchedules.removeFirst();
- verify(fut1, never()).cancel(false);
- verify(fut2, never()).cancel(false);
- verify(fut3, never()).cancel(false);
+ verify(sched1, never()).cancel();
+ verify(sched2, never()).cancel();
+ verify(sched3, never()).cancel();
/*
* Cancel the timers.
@@ -112,9 +114,9 @@ public class StateTest extends BasicStateTester {
state.cancelTimers();
// verify that all were cancelled
- verify(fut1).cancel(false);
- verify(fut2).cancel(false);
- verify(fut3).cancel(false);
+ verify(sched1).cancel();
+ verify(sched2).cancel();
+ verify(sched3).cancel();
}
@Test
@@ -134,13 +136,6 @@ public class StateTest extends BasicStateTester {
}
@Test
- public void testStop() {
- state.stop();
-
- assertEquals(MY_HOST, captureAdminMessage(Offline.class).getSource());
- }
-
- @Test
public void testGoStart() {
State next = mock(State.class);
when(mgr.goStart()).thenReturn(next);
@@ -195,7 +190,18 @@ public class StateTest extends BasicStateTester {
}
@Test
- public void testProcessLeader_NullAssignment() {
+ public void testProcessLeader() {
+ String[] arr = {HOST2, HOST1};
+ BucketAssignments asgn = new BucketAssignments(arr);
+ Leader msg = new Leader(HOST1, asgn);
+
+ // should ignore it
+ assertEquals(null, state.process(msg));
+ verify(mgr).startDistributing(asgn);
+ }
+
+ @Test
+ public void testProcessLeader_Invalid() {
Leader msg = new Leader(PREV_HOST, null);
// should stay in the same state, and not start distributing
@@ -206,57 +212,44 @@ public class StateTest extends BasicStateTester {
}
@Test
- public void testProcessLeader_NullSource() {
+ public void testIsValidLeader_NullAssignment() {
+ assertFalse(state.isValid(new Leader(PREV_HOST, null)));
+ }
+
+ @Test
+ public void testIsValidLeader_NullSource() {
String[] arr = {HOST2, PREV_HOST, MY_HOST};
BucketAssignments asgn = new BucketAssignments(arr);
- Leader msg = new Leader(null, asgn);
+ assertFalse(state.isValid(new Leader(null, asgn)));
+ }
- // should stay in the same state, and not start distributing
- assertNull(state.process(msg));
- verify(mgr, never()).startDistributing(any());
- verify(mgr, never()).goActive();
- verify(mgr, never()).goInactive();
+ @Test
+ public void testIsValidLeader_EmptyAssignment() {
+ assertFalse(state.isValid(new Leader(PREV_HOST, new BucketAssignments())));
}
@Test
- public void testProcessLeader_EmptyAssignment() {
- Leader msg = new Leader(PREV_HOST, new BucketAssignments());
+ public void testIsValidLeader_FromSelf() {
+ String[] arr = {HOST2, MY_HOST};
+ BucketAssignments asgn = new BucketAssignments(arr);
- // should stay in the same state, and not start distributing
- assertNull(state.process(msg));
- verify(mgr, never()).startDistributing(any());
- verify(mgr, never()).goActive();
- verify(mgr, never()).goInactive();
+ assertFalse(state.isValid(new Leader(MY_HOST, asgn)));
}
@Test
- public void testProcessLeader_MyHostAssigned() {
- String[] arr = {HOST2, PREV_HOST, MY_HOST};
+ public void testIsValidLeader_WrongLeader() {
+ String[] arr = {HOST2, HOST3};
BucketAssignments asgn = new BucketAssignments(arr);
- Leader msg = new Leader(PREV_HOST, asgn);
-
- State next = mock(State.class);
- when(mgr.goActive()).thenReturn(next);
- // should go Active and start distributing
- assertEquals(next, state.process(msg));
- verify(mgr).startDistributing(asgn);
- verify(mgr, never()).goInactive();
+ assertFalse(state.isValid(new Leader(HOST1, asgn)));
}
@Test
- public void testProcessLeader_MyHostUnassigned() {
+ public void testIsValidLeader() {
String[] arr = {HOST2, HOST1};
BucketAssignments asgn = new BucketAssignments(arr);
- Leader msg = new Leader(HOST1, asgn);
- State next = mock(State.class);
- when(mgr.goInactive()).thenReturn(next);
-
- // should go Inactive and start distributing
- assertEquals(next, state.process(msg));
- verify(mgr).startDistributing(asgn);
- verify(mgr, never()).goActive();
+ assertTrue(state.isValid(new Leader(HOST1, asgn)));
}
@Test
@@ -344,18 +337,18 @@ public class StateTest extends BasicStateTester {
state.schedule(delay, task);
- ScheduledFuture<?> fut = onceFutures.removeFirst();
+ CancellableScheduledTask sched = onceSchedules.removeFirst();
// scheduled, but not canceled yet
verify(mgr).schedule(delay, task);
- verify(fut, never()).cancel(false);
+ verify(sched, never()).cancel();
/*
- * Ensure the state added the timer to its list by telling it to cancel
- * its timers and then seeing if this timer was canceled.
+ * Ensure the state added the timer to its list by telling it to cancel its timers
+ * and then seeing if this timer was canceled.
*/
state.cancelTimers();
- verify(fut).cancel(false);
+ verify(sched).cancel();
}
@Test
@@ -367,18 +360,18 @@ public class StateTest extends BasicStateTester {
state.scheduleWithFixedDelay(initdel, delay, task);
- ScheduledFuture<?> fut = repeatedFutures.removeFirst();
+ CancellableScheduledTask sched = repeatedSchedules.removeFirst();
// scheduled, but not canceled yet
verify(mgr).scheduleWithFixedDelay(initdel, delay, task);
- verify(fut, never()).cancel(false);
+ verify(sched, never()).cancel();
/*
- * Ensure the state added the timer to its list by telling it to cancel
- * its timers and then seeing if this timer was canceled.
+ * Ensure the state added the timer to its list by telling it to cancel its timers
+ * and then seeing if this timer was canceled.
*/
state.cancelTimers();
- verify(fut).cancel(false);
+ verify(sched).cancel();
}
@Test