diff options
Diffstat (limited to 'feature-pooling-dmaap')
15 files changed, 217 insertions, 281 deletions
diff --git a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties new file mode 100644 index 00000000..64a6b063 --- /dev/null +++ b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties @@ -0,0 +1,89 @@ +### +# ============LICENSE_START======================================================= +# feature-pooling-dmaap +# ================================================================================ +# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +# In general, the feature-specific properties begin with "pooling", +# and they may be made specific to a controller by prepending with +# "pooling.<controller-name>", instead. +# +# The available properties and their default values are shown below. + +# Whether or not the feature is enabled. +#pooling.enabled=false + +# The internal DMaaP topic used by a controller. Note: the controller +# name is required for this property. +#pooling.<controller-name>.topic = + +# Maximum number of events to retain in the queue while a new host waits +# to be assigned work. +#pooling.offline.queue.limit=1000 + +# Maximum age, in milliseconds, of events to be retained in the queue. +# Events older than this are discarded. +#pooling.offline.queue.age.milliseconds=60000 + +# Time, in milliseconds, to wait for an "Offline" message to be published +# to DMaaP before the connection may be closed. +#pooling.offline.publish.wait.milliseconds=3000 + +# Time, in milliseconds, to wait for this host's initial heart beat. This +# is used to verify connectivity to the internal DMaaP topic. +#pooling.start.heartbeat.milliseconds=100000 + +# Time, in milliseconds, to wait before attempting to re-active this +# host when it was not assigned any work. +#pooling.reactivate.milliseconds=50000 + +# Time, in milliseconds, to wait for other hosts to identify themselves +# when this host is started. +#pooling.identification.milliseconds=50000 + +# Time, in milliseconds, to wait for heart beats from this host, or its +# predecessor, during the active state. +#pooling.active.heartbeat.milliseconds=50000 + +# Time, in milliseconds, to wait between heart beat generations. +#pooling.inter.heartbeat.milliseconds=15000 + +# Topic used for inter-host communication for a particular controller +# pooling.<controller-name>.topic=XXX + +# These specify how the request id is to be extracted from each type of +# object that may be presented to a controller from distributed topics +# (i.e., topics whose events are to be distributed among multiple hosts) +extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId} +extractor.requestId.org.onap.policy.appc.Response=${commonHeader.requestId} +extractor.requestId.org.onap.policy.appclcm.LcmResponseWrapper=${body.commonHeader.requestId} + + +pooling.amsterdam.enabled=true +pooling.amsterdam.topic=${{AMSTERDAM_POOLING_TOPIC}} + +# the list of sources and sinks should be identical +ueb.source.topics=${{AMSTERDAM_POOLING_TOPIC}} +ueb.sink.topics=${{AMSTERDAM_POOLING_TOPIC}} + +ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.servers=${{DMAAP_SERVERS}} +ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiKey= +ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiSecret= + +ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.servers=${{DMAAP_SERVERS}} +ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiKey= +ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiSecret= diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index abb4da33..eb41f803 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -21,7 +21,6 @@ package org.onap.policy.drools.pooling; import java.util.List; -import java.util.Properties; import org.onap.policy.drools.event.comm.FilterableTopicSource; import org.onap.policy.drools.event.comm.TopicEndpoint; import org.onap.policy.drools.event.comm.TopicListener; @@ -31,7 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Manages the internal DMaaP topic. + * Manages the internal DMaaP topic. Assumes all topics are managed by + * {@link TopicEndpoint#manager}. */ public class DmaapManager { @@ -58,18 +58,6 @@ public class DmaapManager { private final TopicSink topicSink; /** - * Topic sources. In theory, there's only one item in this list, the internal DMaaP - * topic. - */ - private final List<TopicSource> sources; - - /** - * Topic sinks. In theory, there's only one item in this list, the internal DMaaP - * topic. - */ - private final List<TopicSink> sinks; - - /** * {@code True} if the consumer is running, {@code false} otherwise. */ private boolean consuming = false; @@ -83,17 +71,14 @@ public class DmaapManager { * Constructs the manager, but does not start the source or sink. * * @param topic name of the internal DMaaP topic - * @param props properties to configure the topic source & sink * @throws PoolingFeatureException if an error occurs */ - public DmaapManager(String topic, Properties props) throws PoolingFeatureException { + public DmaapManager(String topic) throws PoolingFeatureException { logger.info("initializing bus for topic {}", topic); try { this.topic = topic; - this.sources = factory.initTopicSources(props); - this.sinks = factory.initTopicSinks(props); this.topicSource = findTopicSource(); this.topicSink = findTopicSink(); @@ -132,7 +117,7 @@ public class DmaapManager { * @throws PoolingFeatureException if the source doesn't exist or is not filterable */ private FilterableTopicSource findTopicSource() throws PoolingFeatureException { - for (TopicSource src : sources) { + for (TopicSource src : factory.getTopicSources()) { if (topic.equals(src.getTopic())) { if (src instanceof FilterableTopicSource) { return (FilterableTopicSource) src; @@ -153,7 +138,7 @@ public class DmaapManager { * @throws PoolingFeatureException if the sink doesn't exist */ private TopicSink findTopicSink() throws PoolingFeatureException { - for (TopicSink sink : sinks) { + for (TopicSink sink : factory.getTopicSinks()) { if (topic.equals(sink.getTopic())) { return sink; } @@ -164,22 +149,14 @@ public class DmaapManager { /** * Starts the publisher, if it isn't already running. - * - * @throws PoolingFeatureException if an error occurs */ - public void startPublisher() throws PoolingFeatureException { + public void startPublisher() { if (publishing) { return; } - try { - logger.info("start publishing to topic {}", topic); - topicSink.start(); - publishing = true; - - } catch (IllegalStateException e) { - throw new PoolingFeatureException("cannot start topic sink " + topic, e); - } + logger.info("start publishing to topic {}", topic); + publishing = true; } /** @@ -205,14 +182,8 @@ public class DmaapManager { Thread.currentThread().interrupt(); } - try { - logger.info("stop publishing to topic {}", topic); - publishing = false; - topicSink.stop(); - - } catch (IllegalStateException e) { - logger.error("cannot stop sink for topic {}", topic, e); - } + logger.info("stop publishing to topic {}", topic); + publishing = false; } /** @@ -288,23 +259,17 @@ public class DmaapManager { public static class Factory { /** - * Initializes the topic sources. - * - * @param props properties used to configure the topics * @return the topic sources */ - public List<TopicSource> initTopicSources(Properties props) { - return TopicEndpoint.manager.addTopicSources(props); + public List<TopicSource> getTopicSources() { + return TopicEndpoint.manager.getTopicSources(); } /** - * Initializes the topic sinks. - * - * @param props properties used to configure the topics * @return the topic sinks */ - public List<TopicSink> initTopicSinks(Properties props) { - return TopicEndpoint.manager.addTopicSinks(props); + public List<TopicSink> getTopicSinks() { + return TopicEndpoint.manager.getTopicSinks(); } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index 1e2071ab..67cb21ee 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -20,13 +20,16 @@ package org.onap.policy.drools.pooling; +import java.util.List; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.event.comm.TopicEndpoint; +import org.onap.policy.drools.event.comm.TopicSink; +import org.onap.policy.drools.event.comm.TopicSource; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.features.DroolsControllerFeatureAPI; import org.onap.policy.drools.features.PolicyControllerFeatureAPI; @@ -53,7 +56,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Factory used to create objects. */ - private static Factory factory; + private static Factory factory = new Factory(); /** * ID of this host. @@ -124,6 +127,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF public boolean beforeStart(PolicyEngine engine) { logger.info("initializing " + PoolingProperties.FEATURE_NAME); featProps = factory.getProperties(PoolingProperties.FEATURE_NAME); + + factory.initTopicSources(featProps); + factory.initTopicSinks(featProps); + return false; } @@ -189,17 +196,25 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF @Override public boolean afterStop(PolicyController controller) { - - // NOTE: using doDeleteManager() instead of doManager() - - return doDeleteManager(controller, mgr -> { - + return doManager(controller, mgr -> { mgr.afterStop(); return false; }); } @Override + public boolean afterShutdown(PolicyController controller) { + deleteManager(controller); + return false; + } + + @Override + public boolean afterHalt(PolicyController controller) { + deleteManager(controller); + return false; + } + + @Override public boolean beforeLock(PolicyController controller) { return doManager(controller, mgr -> { mgr.beforeLock(); @@ -306,29 +321,17 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF } /** - * Executes a function using the manager associated with the controller and then - * deletes the manager. Catches any exceptions from the function and re-throws it as a - * runtime exception. + * Deletes the manager associated with a controller. * * @param controller - * @param func function to be executed - * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ - private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) { + private void deleteManager(PolicyController controller) { String name = controller.getName(); logger.info("remove feature-pool-dmaap manager for {}", name); - // NOTE: using "remove()" instead of "get()" - - PoolingManagerImpl mgr = ctlr2pool.remove(name); - - if (mgr == null) { - return false; - } - - return func.apply(mgr); + ctlr2pool.remove(name); } /** @@ -416,5 +419,25 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF public PolicyController getController(DroolsController droolsController) { return PolicyController.factory.get(droolsController); } + + /** + * Initializes the topic sources. + * + * @param props properties used to configure the topics + * @return the topic sources + */ + public List<TopicSource> initTopicSources(Properties props) { + return TopicEndpoint.manager.addTopicSources(props); + } + + /** + * Initializes the topic sinks. + * + * @param props properties used to configure the topics + * @return the topic sinks + */ + public List<TopicSink> initTopicSinks(Properties props) { + return TopicEndpoint.manager.addTopicSinks(props); + } } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index de25e471..33f45085 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -45,7 +45,6 @@ import org.onap.policy.drools.pooling.state.QueryState; import org.onap.policy.drools.pooling.state.StartState; import org.onap.policy.drools.pooling.state.State; import org.onap.policy.drools.pooling.state.StateTimerTask; -import org.onap.policy.drools.properties.PolicyProperties; import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.system.PolicyController; import org.slf4j.Logger; @@ -177,8 +176,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { this.topic = props.getPoolingTopic(); this.eventq = factory.makeEventQueue(props); this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource())); - this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(), - makeDmaapProps(controller, props.getSource())); + this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic()); this.current = new IdleState(this); logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic); @@ -237,39 +235,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Makes properties for configuring DMaaP. Copies properties from the source that - * start with the Pooling property prefix followed by the controller name, stripping - * the prefix and controller name. - * - * @param controller the controller for which DMaaP will be configured - * @param source properties from which to get the DMaaP properties - * @return DMaaP properties - */ - private Properties makeDmaapProps(PolicyController controller, Properties source) { - SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source); - - // could be UEB or DMAAP, so add both - addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); - addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - - return specProps; - } - - /** - * Adds DMaaP consumer properties, consumer group & instance. The group is the host - * and the instance is a constant. - * - * @param props where to add the new properties - * @param prefix property prefix - */ - private void addDmaapConsumerProps(SpecProperties props, String prefix) { - String fullpfx = props.getSpecPrefix() + prefix + "." + topic; - - props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host); - props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0"); - } - - /** * Indicates that the controller is about to start. Starts the publisher for the * internal topic, and creates a thread pool for the timers. * @@ -325,6 +290,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { dmaapMgr.stopConsumer(this); publishAdmin(new Offline(getHost())); } + + assignments = null; } if (sched != null) { @@ -902,12 +869,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Creates a DMaaP manager. * * @param topic name of the internal DMaaP topic - * @param props properties used to configure DMaaP * @return a new DMaaP manager * @throws PoolingFeatureException if an error occurs */ - public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException { - return new DmaapManager(topic, props); + public DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException { + return new DmaapManager(topic); } /** diff --git a/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI b/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI index cd59e469..cd59e469 100644 --- a/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI +++ b/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java index a5688df6..6509e90e 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java @@ -24,16 +24,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.LinkedList; -import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.junit.AfterClass; import org.junit.Before; @@ -56,7 +54,6 @@ public class DmaapManagerTest { */ private static Factory saveFactory; - private Properties props; private Factory factory; private TopicListener listener; private FilterableTopicSource source; @@ -75,8 +72,6 @@ public class DmaapManagerTest { @Before public void setUp() throws Exception { - props = new Properties(); - listener = mock(TopicListener.class); factory = mock(Factory.class); source = mock(FilterableTopicSource.class); @@ -90,21 +85,21 @@ public class DmaapManagerTest { when(sink.send(any())).thenReturn(true); // three sources, with the desired one in the middle - when(factory.initTopicSources(props)) + when(factory.getTopicSources()) .thenReturn(Arrays.asList(mock(TopicSource.class), source, mock(TopicSource.class))); // three sinks, with the desired one in the middle - when(factory.initTopicSinks(props)) + when(factory.getTopicSinks()) .thenReturn(Arrays.asList(mock(TopicSink.class), sink, mock(TopicSink.class))); - mgr = new DmaapManager(MY_TOPIC, props); + mgr = new DmaapManager(MY_TOPIC); } @Test public void testDmaapManager() { // verify that the init methods were called - verify(factory).initTopicSinks(props); - verify(factory).initTopicSinks(props); + verify(factory).getTopicSinks(); + verify(factory).getTopicSinks(); } @Test(expected = PoolingFeatureException.class) @@ -112,15 +107,15 @@ public class DmaapManagerTest { // force error by having no topics match when(source.getTopic()).thenReturn(""); - new DmaapManager(MY_TOPIC, props); + new DmaapManager(MY_TOPIC); } @Test(expected = PoolingFeatureException.class) public void testDmaapManager_IllegalArgEx() throws PoolingFeatureException { // force error - when(factory.initTopicSources(props)).thenThrow(new IllegalArgumentException("expected")); + when(factory.getTopicSources()).thenThrow(new IllegalArgumentException("expected")); - new DmaapManager(MY_TOPIC, props); + new DmaapManager(MY_TOPIC); } @Test(expected = PoolingFeatureException.class) @@ -128,7 +123,7 @@ public class DmaapManagerTest { // force an error when setFilter() is called doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any()); - new DmaapManager(MY_TOPIC, props); + new DmaapManager(MY_TOPIC); } @Test @@ -148,25 +143,25 @@ public class DmaapManagerTest { TopicSource source2 = mock(TopicSource.class); when(source2.getTopic()).thenReturn(MY_TOPIC); - when(factory.initTopicSources(props)).thenReturn(Arrays.asList(source2)); + when(factory.getTopicSources()).thenReturn(Arrays.asList(source2)); - new DmaapManager(MY_TOPIC, props); + new DmaapManager(MY_TOPIC); } @Test(expected = PoolingFeatureException.class) public void testFindTopicSource_NotFound() throws PoolingFeatureException { // one item in list, and its topic doesn't match - when(factory.initTopicSources(props)).thenReturn(Arrays.asList(mock(TopicSource.class))); + when(factory.getTopicSources()).thenReturn(Arrays.asList(mock(TopicSource.class))); - new DmaapManager(MY_TOPIC, props); + new DmaapManager(MY_TOPIC); } @Test(expected = PoolingFeatureException.class) public void testFindTopicSource_EmptyList() throws PoolingFeatureException { // empty list - when(factory.initTopicSources(props)).thenReturn(new LinkedList<>()); + when(factory.getTopicSources()).thenReturn(new LinkedList<>()); - new DmaapManager(MY_TOPIC, props); + new DmaapManager(MY_TOPIC); } @Test @@ -177,50 +172,26 @@ public class DmaapManagerTest { @Test(expected = PoolingFeatureException.class) public void testFindTopicSink_NotFound() throws PoolingFeatureException { // one item in list, and its topic doesn't match - when(factory.initTopicSinks(props)).thenReturn(Arrays.asList(mock(TopicSink.class))); + when(factory.getTopicSinks()).thenReturn(Arrays.asList(mock(TopicSink.class))); - new DmaapManager(MY_TOPIC, props); + new DmaapManager(MY_TOPIC); } @Test(expected = PoolingFeatureException.class) public void testFindTopicSink_EmptyList() throws PoolingFeatureException { // empty list - when(factory.initTopicSinks(props)).thenReturn(new LinkedList<>()); + when(factory.getTopicSinks()).thenReturn(new LinkedList<>()); - new DmaapManager(MY_TOPIC, props); + new DmaapManager(MY_TOPIC); } @Test public void testStartPublisher() throws PoolingFeatureException { - // not started yet - verify(sink, never()).start(); - + mgr.startPublisher(); - verify(sink).start(); // restart should have no effect mgr.startPublisher(); - verify(sink).start(); - - // should be able to publish now - mgr.publish(MSG); - verify(sink).send(MSG); - } - - @Test - public void testStartPublisher_Exception() throws PoolingFeatureException { - // force exception when it starts - doThrow(new IllegalStateException("expected")).when(sink).start(); - - expectException("startPublisher,start", () -> mgr.startPublisher()); - expectException("startPublisher,publish", () -> mgr.publish(MSG)); - - // allow it to succeed this time - reset(sink); - when(sink.send(any())).thenReturn(true); - - mgr.startPublisher(); - verify(sink).start(); // should be able to publish now mgr.publish(MSG); @@ -231,18 +202,15 @@ public class DmaapManagerTest { public void testStopPublisher() throws PoolingFeatureException { // not publishing yet, so stopping should have no effect mgr.stopPublisher(0); - verify(sink, never()).stop(); // now start it mgr.startPublisher(); // this time, stop should do something mgr.stopPublisher(0); - verify(sink).stop(); // re-stopping should have no effect mgr.stopPublisher(0); - verify(sink).stop(); } @Test @@ -285,16 +253,6 @@ public class DmaapManagerTest { } @Test - public void testStopPublisher_Exception() throws PoolingFeatureException { - mgr.startPublisher(); - - // force exception when it stops - doThrow(new IllegalStateException("expected")).when(sink).stop(); - - mgr.stopPublisher(0); - } - - @Test public void testStartConsumer() { // not started yet verify(source, never()).register(any()); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java index cc588384..d453e746 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java @@ -874,11 +874,6 @@ public class FeatureTest { private static class TopicImpl implements Topic { /** - * {@code True} if this topic is alive/running, {@code false} otherwise. - */ - private boolean alive = false; - - /** * */ public TopicImpl() { @@ -917,32 +912,22 @@ public class FeatureTest { @Override public synchronized boolean start() { - if (alive) { - throw new IllegalStateException("topic already started"); - } - - alive = true; return true; } @Override public synchronized boolean stop() { - if (!alive) { - throw new IllegalStateException("topic is not running"); - } - - alive = false; return true; } @Override public synchronized void shutdown() { - alive = false; + // do nothing } @Override public synchronized boolean isAlive() { - return alive; + return true; } @Override @@ -1081,12 +1066,12 @@ public class FeatureTest { } @Override - public List<TopicSource> initTopicSources(Properties props) { + public List<TopicSource> getTopicSources() { return Arrays.asList(new TopicSourceImpl(context, true)); } @Override - public List<TopicSink> initTopicSinks(Properties props) { + public List<TopicSink> getTopicSinks() { return Arrays.asList(new TopicSinkImpl(context)); } } diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java index f8f37559..32264e3a 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java @@ -271,11 +271,28 @@ public class PoolingFeatureTest { assertFalse(pool.afterStop(controller1)); verify(mgr1).afterStop(); - // ensure it has been removed from the map by re-invoking - assertFalse(pool.afterStop(controller1)); - + assertFalse(pool.afterStop(controllerDisabled)); + // count should be unchanged verify(mgr1).afterStop(); + } + + @Test + public void testAfterHalt() { + assertFalse(pool.afterHalt(controller1)); + assertFalse(pool.afterHalt(controller1)); + + verify(mgr1, never()).afterStop(); + + assertFalse(pool.afterStop(controllerDisabled)); + } + + @Test + public void testAfterShutdown() { + assertFalse(pool.afterShutdown(controller1)); + assertFalse(pool.afterShutdown(controller1)); + + verify(mgr1, never()).afterStop(); assertFalse(pool.afterStop(controllerDisabled)); } @@ -464,46 +481,6 @@ public class PoolingFeatureTest { pool.beforeStart(controller1); } - @Test - public void testDoDeleteManager() { - assertFalse(pool.afterStop(controller1)); - verify(mgr1).afterStop(); - - // ensure it has been removed from the map by re-invoking - assertFalse(pool.afterStop(controller1)); - - // count should be unchanged - verify(mgr1).afterStop(); - - - // different controller - assertFalse(pool.afterStop(controller2)); - verify(mgr2).afterStop(); - - // ensure it has been removed from the map by re-invoking - assertFalse(pool.afterStop(controller2)); - - // count should be unchanged - verify(mgr2).afterStop(); - - - assertFalse(pool.afterStop(controllerDisabled)); - } - - @Test - public void testDoDeleteManager_NotFound() { - assertFalse(pool.afterStop(controllerDisabled)); - } - - @Test(expected = PoolingFeatureRtException.class) - public void testDoDeleteManager_Ex() { - - // generate exception - doThrow(new PoolingFeatureRtException()).when(mgr1).afterStop(); - - pool.afterStop(controller1); - } - private Properties initProperties() { Properties props = new Properties(); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index e0024b79..64573ab0 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -155,7 +155,7 @@ public class PoolingManagerImplTest { when(factory.makeEventQueue(any())).thenReturn(eventQueue); when(factory.makeClassExtractors(any())).thenReturn(extractors); - when(factory.makeDmaapManager(any(), any())).thenReturn(dmaap); + when(factory.makeDmaapManager(any())).thenReturn(dmaap); when(factory.makeScheduler()).thenReturn(sched); when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true); when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT); @@ -188,7 +188,7 @@ public class PoolingManagerImplTest { @Test public void testPoolingManagerImpl() throws Exception { - verify(factory).makeDmaapManager(any(), any()); + verify(factory).makeDmaapManager(any()); State st = mgr.getCurrent(); assertTrue(st instanceof IdleState); @@ -215,7 +215,7 @@ public class PoolingManagerImplTest { public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException { // throw an exception when we try to create the dmaap manager PoolingFeatureException ex = new PoolingFeatureException(); - when(factory.makeDmaapManager(any(), any())).thenThrow(ex); + when(factory.makeDmaapManager(any())).thenThrow(ex); PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class, () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active)); @@ -272,19 +272,6 @@ public class PoolingManagerImplTest { } @Test - public void testBeforeStart_DmaapEx() throws Exception { - // generate an exception - PoolingFeatureException ex = new PoolingFeatureException(); - doThrow(ex).when(dmaap).startPublisher(); - - PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, () -> mgr.beforeStart()); - assertEquals(ex, ex2); - - // should never start the scheduler - verify(factory, never()).makeScheduler(); - } - - @Test public void testAfterStart() throws Exception { startMgr(); @@ -317,7 +304,13 @@ public class PoolingManagerImplTest { @Test public void testBeforeStop() throws Exception { startMgr(); + mgr.startDistributing(makeAssignments(true)); + // verify that this message is not queued + Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID); + mgr.handle(msg); + verify(eventQueue, never()).add(msg); + mgr.beforeStop(); verify(dmaap).stopConsumer(mgr); @@ -325,6 +318,10 @@ public class PoolingManagerImplTest { verify(dmaap).publish(contains("offline")); assertTrue(mgr.getCurrent() instanceof IdleState); + + // verify that next message is queued + mgr.handle(msg); + verify(eventQueue).add(msg); } @Test diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java index c56caca8..bc92fa27 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java @@ -177,11 +177,7 @@ public class ForwardTest extends BasicMessageTester<Forward> { expectCheckValidityFailure(msg -> msg.setNumHops(-1)); } - /** - * Makes a message that will pass the validity check. - * - * @return a valid Message - */ + @Override public Forward makeValidMessage() { tcreateMs = System.currentTimeMillis(); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java index da78dbe3..43f1afd4 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java @@ -33,11 +33,7 @@ public class HeartbeatTest extends BasicMessageTester<Heartbeat> { super(Heartbeat.class); } - /** - * Makes a message that will pass the validity check. - * - * @return a valid Message - */ + @Override public Heartbeat makeValidMessage() { Heartbeat msg = new Heartbeat(VALID_HOST, ++sequence); msg.setChannel(VALID_CHANNEL); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java index 8255034f..7b28afc0 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java @@ -62,11 +62,7 @@ public class IdentificationTest extends MessageWithAssignmentsTester<Identificat msg.checkValidity(); } - /** - * Makes a message that will pass the validity check. - * - * @return a valid Message - */ + @Override public Identification makeValidMessage() { Identification msg = new Identification(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN)); msg.setChannel(VALID_CHANNEL); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java index 0f58e224..e30d7d09 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java @@ -62,11 +62,7 @@ public class LeaderTest extends MessageWithAssignmentsTester<Leader> { expectCheckValidityFailure(msg -> msg.setAssignments(asgnNotSmallest)); } - /** - * Makes a message that will pass the validity check. - * - * @return a valid Message - */ + @Override public Leader makeValidMessage() { Leader msg = new Leader(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN)); msg.setChannel(VALID_CHANNEL); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java index 8d0f4a6f..4fe37366 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java @@ -26,11 +26,7 @@ public class OfflineTest extends BasicMessageTester<Offline> { super(Offline.class); } - /** - * Makes a message that will pass the validity check. - * - * @return a valid Message - */ + @Override public Offline makeValidMessage() { Offline msg = new Offline(VALID_HOST); msg.setChannel(VALID_CHANNEL); diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java index 0b2a986d..e0ab016a 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java @@ -26,11 +26,7 @@ public class QueryTest extends BasicMessageTester<Query> { super(Query.class); } - /** - * Makes a message that will pass the validity check. - * - * @return a valid Message - */ + @Override public Query makeValidMessage() { Query msg = new Query(VALID_HOST); msg.setChannel(VALID_CHANNEL); |