summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap')
-rw-r--r--feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties89
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java63
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java67
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java44
-rw-r--r--feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI (renamed from feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI)0
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java84
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java23
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java63
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java29
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java6
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java6
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java6
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java6
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java6
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java6
15 files changed, 217 insertions, 281 deletions
diff --git a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
new file mode 100644
index 00000000..64a6b063
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
@@ -0,0 +1,89 @@
+###
+# ============LICENSE_START=======================================================
+# feature-pooling-dmaap
+# ================================================================================
+# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+# In general, the feature-specific properties begin with "pooling",
+# and they may be made specific to a controller by prepending with
+# "pooling.<controller-name>", instead.
+#
+# The available properties and their default values are shown below.
+
+# Whether or not the feature is enabled.
+#pooling.enabled=false
+
+# The internal DMaaP topic used by a controller. Note: the controller
+# name is required for this property.
+#pooling.<controller-name>.topic =
+
+# Maximum number of events to retain in the queue while a new host waits
+# to be assigned work.
+#pooling.offline.queue.limit=1000
+
+# Maximum age, in milliseconds, of events to be retained in the queue.
+# Events older than this are discarded.
+#pooling.offline.queue.age.milliseconds=60000
+
+# Time, in milliseconds, to wait for an "Offline" message to be published
+# to DMaaP before the connection may be closed.
+#pooling.offline.publish.wait.milliseconds=3000
+
+# Time, in milliseconds, to wait for this host's initial heart beat. This
+# is used to verify connectivity to the internal DMaaP topic.
+#pooling.start.heartbeat.milliseconds=100000
+
+# Time, in milliseconds, to wait before attempting to re-active this
+# host when it was not assigned any work.
+#pooling.reactivate.milliseconds=50000
+
+# Time, in milliseconds, to wait for other hosts to identify themselves
+# when this host is started.
+#pooling.identification.milliseconds=50000
+
+# Time, in milliseconds, to wait for heart beats from this host, or its
+# predecessor, during the active state.
+#pooling.active.heartbeat.milliseconds=50000
+
+# Time, in milliseconds, to wait between heart beat generations.
+#pooling.inter.heartbeat.milliseconds=15000
+
+# Topic used for inter-host communication for a particular controller
+# pooling.<controller-name>.topic=XXX
+
+# These specify how the request id is to be extracted from each type of
+# object that may be presented to a controller from distributed topics
+# (i.e., topics whose events are to be distributed among multiple hosts)
+extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId}
+extractor.requestId.org.onap.policy.appc.Response=${commonHeader.requestId}
+extractor.requestId.org.onap.policy.appclcm.LcmResponseWrapper=${body.commonHeader.requestId}
+
+
+pooling.amsterdam.enabled=true
+pooling.amsterdam.topic=${{AMSTERDAM_POOLING_TOPIC}}
+
+# the list of sources and sinks should be identical
+ueb.source.topics=${{AMSTERDAM_POOLING_TOPIC}}
+ueb.sink.topics=${{AMSTERDAM_POOLING_TOPIC}}
+
+ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.servers=${{DMAAP_SERVERS}}
+ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiKey=
+ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiSecret=
+
+ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.servers=${{DMAAP_SERVERS}}
+ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiKey=
+ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiSecret=
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index abb4da33..eb41f803 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import java.util.List;
-import java.util.Properties;
import org.onap.policy.drools.event.comm.FilterableTopicSource;
import org.onap.policy.drools.event.comm.TopicEndpoint;
import org.onap.policy.drools.event.comm.TopicListener;
@@ -31,7 +30,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Manages the internal DMaaP topic.
+ * Manages the internal DMaaP topic. Assumes all topics are managed by
+ * {@link TopicEndpoint#manager}.
*/
public class DmaapManager {
@@ -58,18 +58,6 @@ public class DmaapManager {
private final TopicSink topicSink;
/**
- * Topic sources. In theory, there's only one item in this list, the internal DMaaP
- * topic.
- */
- private final List<TopicSource> sources;
-
- /**
- * Topic sinks. In theory, there's only one item in this list, the internal DMaaP
- * topic.
- */
- private final List<TopicSink> sinks;
-
- /**
* {@code True} if the consumer is running, {@code false} otherwise.
*/
private boolean consuming = false;
@@ -83,17 +71,14 @@ public class DmaapManager {
* Constructs the manager, but does not start the source or sink.
*
* @param topic name of the internal DMaaP topic
- * @param props properties to configure the topic source & sink
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManager(String topic, Properties props) throws PoolingFeatureException {
+ public DmaapManager(String topic) throws PoolingFeatureException {
logger.info("initializing bus for topic {}", topic);
try {
this.topic = topic;
- this.sources = factory.initTopicSources(props);
- this.sinks = factory.initTopicSinks(props);
this.topicSource = findTopicSource();
this.topicSink = findTopicSink();
@@ -132,7 +117,7 @@ public class DmaapManager {
* @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
- for (TopicSource src : sources) {
+ for (TopicSource src : factory.getTopicSources()) {
if (topic.equals(src.getTopic())) {
if (src instanceof FilterableTopicSource) {
return (FilterableTopicSource) src;
@@ -153,7 +138,7 @@ public class DmaapManager {
* @throws PoolingFeatureException if the sink doesn't exist
*/
private TopicSink findTopicSink() throws PoolingFeatureException {
- for (TopicSink sink : sinks) {
+ for (TopicSink sink : factory.getTopicSinks()) {
if (topic.equals(sink.getTopic())) {
return sink;
}
@@ -164,22 +149,14 @@ public class DmaapManager {
/**
* Starts the publisher, if it isn't already running.
- *
- * @throws PoolingFeatureException if an error occurs
*/
- public void startPublisher() throws PoolingFeatureException {
+ public void startPublisher() {
if (publishing) {
return;
}
- try {
- logger.info("start publishing to topic {}", topic);
- topicSink.start();
- publishing = true;
-
- } catch (IllegalStateException e) {
- throw new PoolingFeatureException("cannot start topic sink " + topic, e);
- }
+ logger.info("start publishing to topic {}", topic);
+ publishing = true;
}
/**
@@ -205,14 +182,8 @@ public class DmaapManager {
Thread.currentThread().interrupt();
}
- try {
- logger.info("stop publishing to topic {}", topic);
- publishing = false;
- topicSink.stop();
-
- } catch (IllegalStateException e) {
- logger.error("cannot stop sink for topic {}", topic, e);
- }
+ logger.info("stop publishing to topic {}", topic);
+ publishing = false;
}
/**
@@ -288,23 +259,17 @@ public class DmaapManager {
public static class Factory {
/**
- * Initializes the topic sources.
- *
- * @param props properties used to configure the topics
* @return the topic sources
*/
- public List<TopicSource> initTopicSources(Properties props) {
- return TopicEndpoint.manager.addTopicSources(props);
+ public List<TopicSource> getTopicSources() {
+ return TopicEndpoint.manager.getTopicSources();
}
/**
- * Initializes the topic sinks.
- *
- * @param props properties used to configure the topics
* @return the topic sinks
*/
- public List<TopicSink> initTopicSinks(Properties props) {
- return TopicEndpoint.manager.addTopicSinks(props);
+ public List<TopicSink> getTopicSinks() {
+ return TopicEndpoint.manager.getTopicSinks();
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 1e2071ab..67cb21ee 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -20,13 +20,16 @@
package org.onap.policy.drools.pooling;
+import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.function.Function;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.TopicEndpoint;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
@@ -53,7 +56,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
* Factory used to create objects.
*/
- private static Factory factory;
+ private static Factory factory = new Factory();
/**
* ID of this host.
@@ -124,6 +127,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
public boolean beforeStart(PolicyEngine engine) {
logger.info("initializing " + PoolingProperties.FEATURE_NAME);
featProps = factory.getProperties(PoolingProperties.FEATURE_NAME);
+
+ factory.initTopicSources(featProps);
+ factory.initTopicSinks(featProps);
+
return false;
}
@@ -189,17 +196,25 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
@Override
public boolean afterStop(PolicyController controller) {
-
- // NOTE: using doDeleteManager() instead of doManager()
-
- return doDeleteManager(controller, mgr -> {
-
+ return doManager(controller, mgr -> {
mgr.afterStop();
return false;
});
}
@Override
+ public boolean afterShutdown(PolicyController controller) {
+ deleteManager(controller);
+ return false;
+ }
+
+ @Override
+ public boolean afterHalt(PolicyController controller) {
+ deleteManager(controller);
+ return false;
+ }
+
+ @Override
public boolean beforeLock(PolicyController controller) {
return doManager(controller, mgr -> {
mgr.beforeLock();
@@ -306,29 +321,17 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
}
/**
- * Executes a function using the manager associated with the controller and then
- * deletes the manager. Catches any exceptions from the function and re-throws it as a
- * runtime exception.
+ * Deletes the manager associated with a controller.
*
* @param controller
- * @param func function to be executed
- * @return {@code true} if the function handled the request, {@code false} otherwise
* @throws PoolingFeatureRtException if an error occurs
*/
- private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) {
+ private void deleteManager(PolicyController controller) {
String name = controller.getName();
logger.info("remove feature-pool-dmaap manager for {}", name);
- // NOTE: using "remove()" instead of "get()"
-
- PoolingManagerImpl mgr = ctlr2pool.remove(name);
-
- if (mgr == null) {
- return false;
- }
-
- return func.apply(mgr);
+ ctlr2pool.remove(name);
}
/**
@@ -416,5 +419,25 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
public PolicyController getController(DroolsController droolsController) {
return PolicyController.factory.get(droolsController);
}
+
+ /**
+ * Initializes the topic sources.
+ *
+ * @param props properties used to configure the topics
+ * @return the topic sources
+ */
+ public List<TopicSource> initTopicSources(Properties props) {
+ return TopicEndpoint.manager.addTopicSources(props);
+ }
+
+ /**
+ * Initializes the topic sinks.
+ *
+ * @param props properties used to configure the topics
+ * @return the topic sinks
+ */
+ public List<TopicSink> initTopicSinks(Properties props) {
+ return TopicEndpoint.manager.addTopicSinks(props);
+ }
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index de25e471..33f45085 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -45,7 +45,6 @@ import org.onap.policy.drools.pooling.state.QueryState;
import org.onap.policy.drools.pooling.state.StartState;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
-import org.onap.policy.drools.properties.PolicyProperties;
import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
import org.onap.policy.drools.system.PolicyController;
import org.slf4j.Logger;
@@ -177,8 +176,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.topic = props.getPoolingTopic();
this.eventq = factory.makeEventQueue(props);
this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
- this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(),
- makeDmaapProps(controller, props.getSource()));
+ this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
@@ -237,39 +235,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes properties for configuring DMaaP. Copies properties from the source that
- * start with the Pooling property prefix followed by the controller name, stripping
- * the prefix and controller name.
- *
- * @param controller the controller for which DMaaP will be configured
- * @param source properties from which to get the DMaaP properties
- * @return DMaaP properties
- */
- private Properties makeDmaapProps(PolicyController controller, Properties source) {
- SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source);
-
- // could be UEB or DMAAP, so add both
- addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
- addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
-
- return specProps;
- }
-
- /**
- * Adds DMaaP consumer properties, consumer group & instance. The group is the host
- * and the instance is a constant.
- *
- * @param props where to add the new properties
- * @param prefix property prefix
- */
- private void addDmaapConsumerProps(SpecProperties props, String prefix) {
- String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
-
- props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
- props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
- }
-
- /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*
@@ -325,6 +290,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
dmaapMgr.stopConsumer(this);
publishAdmin(new Offline(getHost()));
}
+
+ assignments = null;
}
if (sched != null) {
@@ -902,12 +869,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* Creates a DMaaP manager.
*
* @param topic name of the internal DMaaP topic
- * @param props properties used to configure DMaaP
* @return a new DMaaP manager
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException {
- return new DmaapManager(topic, props);
+ public DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
+ return new DmaapManager(topic);
}
/**
diff --git a/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI b/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI
index cd59e469..cd59e469 100644
--- a/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI
+++ b/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
index a5688df6..6509e90e 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -24,16 +24,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.LinkedList;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.junit.AfterClass;
import org.junit.Before;
@@ -56,7 +54,6 @@ public class DmaapManagerTest {
*/
private static Factory saveFactory;
- private Properties props;
private Factory factory;
private TopicListener listener;
private FilterableTopicSource source;
@@ -75,8 +72,6 @@ public class DmaapManagerTest {
@Before
public void setUp() throws Exception {
- props = new Properties();
-
listener = mock(TopicListener.class);
factory = mock(Factory.class);
source = mock(FilterableTopicSource.class);
@@ -90,21 +85,21 @@ public class DmaapManagerTest {
when(sink.send(any())).thenReturn(true);
// three sources, with the desired one in the middle
- when(factory.initTopicSources(props))
+ when(factory.getTopicSources())
.thenReturn(Arrays.asList(mock(TopicSource.class), source, mock(TopicSource.class)));
// three sinks, with the desired one in the middle
- when(factory.initTopicSinks(props))
+ when(factory.getTopicSinks())
.thenReturn(Arrays.asList(mock(TopicSink.class), sink, mock(TopicSink.class)));
- mgr = new DmaapManager(MY_TOPIC, props);
+ mgr = new DmaapManager(MY_TOPIC);
}
@Test
public void testDmaapManager() {
// verify that the init methods were called
- verify(factory).initTopicSinks(props);
- verify(factory).initTopicSinks(props);
+ verify(factory).getTopicSinks();
+ verify(factory).getTopicSinks();
}
@Test(expected = PoolingFeatureException.class)
@@ -112,15 +107,15 @@ public class DmaapManagerTest {
// force error by having no topics match
when(source.getTopic()).thenReturn("");
- new DmaapManager(MY_TOPIC, props);
+ new DmaapManager(MY_TOPIC);
}
@Test(expected = PoolingFeatureException.class)
public void testDmaapManager_IllegalArgEx() throws PoolingFeatureException {
// force error
- when(factory.initTopicSources(props)).thenThrow(new IllegalArgumentException("expected"));
+ when(factory.getTopicSources()).thenThrow(new IllegalArgumentException("expected"));
- new DmaapManager(MY_TOPIC, props);
+ new DmaapManager(MY_TOPIC);
}
@Test(expected = PoolingFeatureException.class)
@@ -128,7 +123,7 @@ public class DmaapManagerTest {
// force an error when setFilter() is called
doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any());
- new DmaapManager(MY_TOPIC, props);
+ new DmaapManager(MY_TOPIC);
}
@Test
@@ -148,25 +143,25 @@ public class DmaapManagerTest {
TopicSource source2 = mock(TopicSource.class);
when(source2.getTopic()).thenReturn(MY_TOPIC);
- when(factory.initTopicSources(props)).thenReturn(Arrays.asList(source2));
+ when(factory.getTopicSources()).thenReturn(Arrays.asList(source2));
- new DmaapManager(MY_TOPIC, props);
+ new DmaapManager(MY_TOPIC);
}
@Test(expected = PoolingFeatureException.class)
public void testFindTopicSource_NotFound() throws PoolingFeatureException {
// one item in list, and its topic doesn't match
- when(factory.initTopicSources(props)).thenReturn(Arrays.asList(mock(TopicSource.class)));
+ when(factory.getTopicSources()).thenReturn(Arrays.asList(mock(TopicSource.class)));
- new DmaapManager(MY_TOPIC, props);
+ new DmaapManager(MY_TOPIC);
}
@Test(expected = PoolingFeatureException.class)
public void testFindTopicSource_EmptyList() throws PoolingFeatureException {
// empty list
- when(factory.initTopicSources(props)).thenReturn(new LinkedList<>());
+ when(factory.getTopicSources()).thenReturn(new LinkedList<>());
- new DmaapManager(MY_TOPIC, props);
+ new DmaapManager(MY_TOPIC);
}
@Test
@@ -177,50 +172,26 @@ public class DmaapManagerTest {
@Test(expected = PoolingFeatureException.class)
public void testFindTopicSink_NotFound() throws PoolingFeatureException {
// one item in list, and its topic doesn't match
- when(factory.initTopicSinks(props)).thenReturn(Arrays.asList(mock(TopicSink.class)));
+ when(factory.getTopicSinks()).thenReturn(Arrays.asList(mock(TopicSink.class)));
- new DmaapManager(MY_TOPIC, props);
+ new DmaapManager(MY_TOPIC);
}
@Test(expected = PoolingFeatureException.class)
public void testFindTopicSink_EmptyList() throws PoolingFeatureException {
// empty list
- when(factory.initTopicSinks(props)).thenReturn(new LinkedList<>());
+ when(factory.getTopicSinks()).thenReturn(new LinkedList<>());
- new DmaapManager(MY_TOPIC, props);
+ new DmaapManager(MY_TOPIC);
}
@Test
public void testStartPublisher() throws PoolingFeatureException {
- // not started yet
- verify(sink, never()).start();
-
+
mgr.startPublisher();
- verify(sink).start();
// restart should have no effect
mgr.startPublisher();
- verify(sink).start();
-
- // should be able to publish now
- mgr.publish(MSG);
- verify(sink).send(MSG);
- }
-
- @Test
- public void testStartPublisher_Exception() throws PoolingFeatureException {
- // force exception when it starts
- doThrow(new IllegalStateException("expected")).when(sink).start();
-
- expectException("startPublisher,start", () -> mgr.startPublisher());
- expectException("startPublisher,publish", () -> mgr.publish(MSG));
-
- // allow it to succeed this time
- reset(sink);
- when(sink.send(any())).thenReturn(true);
-
- mgr.startPublisher();
- verify(sink).start();
// should be able to publish now
mgr.publish(MSG);
@@ -231,18 +202,15 @@ public class DmaapManagerTest {
public void testStopPublisher() throws PoolingFeatureException {
// not publishing yet, so stopping should have no effect
mgr.stopPublisher(0);
- verify(sink, never()).stop();
// now start it
mgr.startPublisher();
// this time, stop should do something
mgr.stopPublisher(0);
- verify(sink).stop();
// re-stopping should have no effect
mgr.stopPublisher(0);
- verify(sink).stop();
}
@Test
@@ -285,16 +253,6 @@ public class DmaapManagerTest {
}
@Test
- public void testStopPublisher_Exception() throws PoolingFeatureException {
- mgr.startPublisher();
-
- // force exception when it stops
- doThrow(new IllegalStateException("expected")).when(sink).stop();
-
- mgr.stopPublisher(0);
- }
-
- @Test
public void testStartConsumer() {
// not started yet
verify(source, never()).register(any());
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index cc588384..d453e746 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -874,11 +874,6 @@ public class FeatureTest {
private static class TopicImpl implements Topic {
/**
- * {@code True} if this topic is alive/running, {@code false} otherwise.
- */
- private boolean alive = false;
-
- /**
*
*/
public TopicImpl() {
@@ -917,32 +912,22 @@ public class FeatureTest {
@Override
public synchronized boolean start() {
- if (alive) {
- throw new IllegalStateException("topic already started");
- }
-
- alive = true;
return true;
}
@Override
public synchronized boolean stop() {
- if (!alive) {
- throw new IllegalStateException("topic is not running");
- }
-
- alive = false;
return true;
}
@Override
public synchronized void shutdown() {
- alive = false;
+ // do nothing
}
@Override
public synchronized boolean isAlive() {
- return alive;
+ return true;
}
@Override
@@ -1081,12 +1066,12 @@ public class FeatureTest {
}
@Override
- public List<TopicSource> initTopicSources(Properties props) {
+ public List<TopicSource> getTopicSources() {
return Arrays.asList(new TopicSourceImpl(context, true));
}
@Override
- public List<TopicSink> initTopicSinks(Properties props) {
+ public List<TopicSink> getTopicSinks() {
return Arrays.asList(new TopicSinkImpl(context));
}
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index f8f37559..32264e3a 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -271,11 +271,28 @@ public class PoolingFeatureTest {
assertFalse(pool.afterStop(controller1));
verify(mgr1).afterStop();
- // ensure it has been removed from the map by re-invoking
- assertFalse(pool.afterStop(controller1));
-
+ assertFalse(pool.afterStop(controllerDisabled));
+
// count should be unchanged
verify(mgr1).afterStop();
+ }
+
+ @Test
+ public void testAfterHalt() {
+ assertFalse(pool.afterHalt(controller1));
+ assertFalse(pool.afterHalt(controller1));
+
+ verify(mgr1, never()).afterStop();
+
+ assertFalse(pool.afterStop(controllerDisabled));
+ }
+
+ @Test
+ public void testAfterShutdown() {
+ assertFalse(pool.afterShutdown(controller1));
+ assertFalse(pool.afterShutdown(controller1));
+
+ verify(mgr1, never()).afterStop();
assertFalse(pool.afterStop(controllerDisabled));
}
@@ -464,46 +481,6 @@ public class PoolingFeatureTest {
pool.beforeStart(controller1);
}
- @Test
- public void testDoDeleteManager() {
- assertFalse(pool.afterStop(controller1));
- verify(mgr1).afterStop();
-
- // ensure it has been removed from the map by re-invoking
- assertFalse(pool.afterStop(controller1));
-
- // count should be unchanged
- verify(mgr1).afterStop();
-
-
- // different controller
- assertFalse(pool.afterStop(controller2));
- verify(mgr2).afterStop();
-
- // ensure it has been removed from the map by re-invoking
- assertFalse(pool.afterStop(controller2));
-
- // count should be unchanged
- verify(mgr2).afterStop();
-
-
- assertFalse(pool.afterStop(controllerDisabled));
- }
-
- @Test
- public void testDoDeleteManager_NotFound() {
- assertFalse(pool.afterStop(controllerDisabled));
- }
-
- @Test(expected = PoolingFeatureRtException.class)
- public void testDoDeleteManager_Ex() {
-
- // generate exception
- doThrow(new PoolingFeatureRtException()).when(mgr1).afterStop();
-
- pool.afterStop(controller1);
- }
-
private Properties initProperties() {
Properties props = new Properties();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index e0024b79..64573ab0 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -155,7 +155,7 @@ public class PoolingManagerImplTest {
when(factory.makeEventQueue(any())).thenReturn(eventQueue);
when(factory.makeClassExtractors(any())).thenReturn(extractors);
- when(factory.makeDmaapManager(any(), any())).thenReturn(dmaap);
+ when(factory.makeDmaapManager(any())).thenReturn(dmaap);
when(factory.makeScheduler()).thenReturn(sched);
when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true);
when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT);
@@ -188,7 +188,7 @@ public class PoolingManagerImplTest {
@Test
public void testPoolingManagerImpl() throws Exception {
- verify(factory).makeDmaapManager(any(), any());
+ verify(factory).makeDmaapManager(any());
State st = mgr.getCurrent();
assertTrue(st instanceof IdleState);
@@ -215,7 +215,7 @@ public class PoolingManagerImplTest {
public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
// throw an exception when we try to create the dmaap manager
PoolingFeatureException ex = new PoolingFeatureException();
- when(factory.makeDmaapManager(any(), any())).thenThrow(ex);
+ when(factory.makeDmaapManager(any())).thenThrow(ex);
PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
() -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active));
@@ -272,19 +272,6 @@ public class PoolingManagerImplTest {
}
@Test
- public void testBeforeStart_DmaapEx() throws Exception {
- // generate an exception
- PoolingFeatureException ex = new PoolingFeatureException();
- doThrow(ex).when(dmaap).startPublisher();
-
- PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, () -> mgr.beforeStart());
- assertEquals(ex, ex2);
-
- // should never start the scheduler
- verify(factory, never()).makeScheduler();
- }
-
- @Test
public void testAfterStart() throws Exception {
startMgr();
@@ -317,7 +304,13 @@ public class PoolingManagerImplTest {
@Test
public void testBeforeStop() throws Exception {
startMgr();
+ mgr.startDistributing(makeAssignments(true));
+ // verify that this message is not queued
+ Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+ mgr.handle(msg);
+ verify(eventQueue, never()).add(msg);
+
mgr.beforeStop();
verify(dmaap).stopConsumer(mgr);
@@ -325,6 +318,10 @@ public class PoolingManagerImplTest {
verify(dmaap).publish(contains("offline"));
assertTrue(mgr.getCurrent() instanceof IdleState);
+
+ // verify that next message is queued
+ mgr.handle(msg);
+ verify(eventQueue).add(msg);
}
@Test
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
index c56caca8..bc92fa27 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
@@ -177,11 +177,7 @@ public class ForwardTest extends BasicMessageTester<Forward> {
expectCheckValidityFailure(msg -> msg.setNumHops(-1));
}
- /**
- * Makes a message that will pass the validity check.
- *
- * @return a valid Message
- */
+ @Override
public Forward makeValidMessage() {
tcreateMs = System.currentTimeMillis();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
index da78dbe3..43f1afd4 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
@@ -33,11 +33,7 @@ public class HeartbeatTest extends BasicMessageTester<Heartbeat> {
super(Heartbeat.class);
}
- /**
- * Makes a message that will pass the validity check.
- *
- * @return a valid Message
- */
+ @Override
public Heartbeat makeValidMessage() {
Heartbeat msg = new Heartbeat(VALID_HOST, ++sequence);
msg.setChannel(VALID_CHANNEL);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
index 8255034f..7b28afc0 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
@@ -62,11 +62,7 @@ public class IdentificationTest extends MessageWithAssignmentsTester<Identificat
msg.checkValidity();
}
- /**
- * Makes a message that will pass the validity check.
- *
- * @return a valid Message
- */
+ @Override
public Identification makeValidMessage() {
Identification msg = new Identification(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN));
msg.setChannel(VALID_CHANNEL);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
index 0f58e224..e30d7d09 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
@@ -62,11 +62,7 @@ public class LeaderTest extends MessageWithAssignmentsTester<Leader> {
expectCheckValidityFailure(msg -> msg.setAssignments(asgnNotSmallest));
}
- /**
- * Makes a message that will pass the validity check.
- *
- * @return a valid Message
- */
+ @Override
public Leader makeValidMessage() {
Leader msg = new Leader(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN));
msg.setChannel(VALID_CHANNEL);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
index 8d0f4a6f..4fe37366 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
@@ -26,11 +26,7 @@ public class OfflineTest extends BasicMessageTester<Offline> {
super(Offline.class);
}
- /**
- * Makes a message that will pass the validity check.
- *
- * @return a valid Message
- */
+ @Override
public Offline makeValidMessage() {
Offline msg = new Offline(VALID_HOST);
msg.setChannel(VALID_CHANNEL);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
index 0b2a986d..e0ab016a 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
@@ -26,11 +26,7 @@ public class QueryTest extends BasicMessageTester<Query> {
super(Query.class);
}
- /**
- * Makes a message that will pass the validity check.
- *
- * @return a valid Message
- */
+ @Override
public Query makeValidMessage() {
Query msg = new Query(VALID_HOST);
msg.setChannel(VALID_CHANNEL);