diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java | 63 |
1 files changed, 14 insertions, 49 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index abb4da33..eb41f803 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -21,7 +21,6 @@ package org.onap.policy.drools.pooling; import java.util.List; -import java.util.Properties; import org.onap.policy.drools.event.comm.FilterableTopicSource; import org.onap.policy.drools.event.comm.TopicEndpoint; import org.onap.policy.drools.event.comm.TopicListener; @@ -31,7 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Manages the internal DMaaP topic. + * Manages the internal DMaaP topic. Assumes all topics are managed by + * {@link TopicEndpoint#manager}. */ public class DmaapManager { @@ -58,18 +58,6 @@ public class DmaapManager { private final TopicSink topicSink; /** - * Topic sources. In theory, there's only one item in this list, the internal DMaaP - * topic. - */ - private final List<TopicSource> sources; - - /** - * Topic sinks. In theory, there's only one item in this list, the internal DMaaP - * topic. - */ - private final List<TopicSink> sinks; - - /** * {@code True} if the consumer is running, {@code false} otherwise. */ private boolean consuming = false; @@ -83,17 +71,14 @@ public class DmaapManager { * Constructs the manager, but does not start the source or sink. * * @param topic name of the internal DMaaP topic - * @param props properties to configure the topic source & sink * @throws PoolingFeatureException if an error occurs */ - public DmaapManager(String topic, Properties props) throws PoolingFeatureException { + public DmaapManager(String topic) throws PoolingFeatureException { logger.info("initializing bus for topic {}", topic); try { this.topic = topic; - this.sources = factory.initTopicSources(props); - this.sinks = factory.initTopicSinks(props); this.topicSource = findTopicSource(); this.topicSink = findTopicSink(); @@ -132,7 +117,7 @@ public class DmaapManager { * @throws PoolingFeatureException if the source doesn't exist or is not filterable */ private FilterableTopicSource findTopicSource() throws PoolingFeatureException { - for (TopicSource src : sources) { + for (TopicSource src : factory.getTopicSources()) { if (topic.equals(src.getTopic())) { if (src instanceof FilterableTopicSource) { return (FilterableTopicSource) src; @@ -153,7 +138,7 @@ public class DmaapManager { * @throws PoolingFeatureException if the sink doesn't exist */ private TopicSink findTopicSink() throws PoolingFeatureException { - for (TopicSink sink : sinks) { + for (TopicSink sink : factory.getTopicSinks()) { if (topic.equals(sink.getTopic())) { return sink; } @@ -164,22 +149,14 @@ public class DmaapManager { /** * Starts the publisher, if it isn't already running. - * - * @throws PoolingFeatureException if an error occurs */ - public void startPublisher() throws PoolingFeatureException { + public void startPublisher() { if (publishing) { return; } - try { - logger.info("start publishing to topic {}", topic); - topicSink.start(); - publishing = true; - - } catch (IllegalStateException e) { - throw new PoolingFeatureException("cannot start topic sink " + topic, e); - } + logger.info("start publishing to topic {}", topic); + publishing = true; } /** @@ -205,14 +182,8 @@ public class DmaapManager { Thread.currentThread().interrupt(); } - try { - logger.info("stop publishing to topic {}", topic); - publishing = false; - topicSink.stop(); - - } catch (IllegalStateException e) { - logger.error("cannot stop sink for topic {}", topic, e); - } + logger.info("stop publishing to topic {}", topic); + publishing = false; } /** @@ -288,23 +259,17 @@ public class DmaapManager { public static class Factory { /** - * Initializes the topic sources. - * - * @param props properties used to configure the topics * @return the topic sources */ - public List<TopicSource> initTopicSources(Properties props) { - return TopicEndpoint.manager.addTopicSources(props); + public List<TopicSource> getTopicSources() { + return TopicEndpoint.manager.getTopicSources(); } /** - * Initializes the topic sinks. - * - * @param props properties used to configure the topics * @return the topic sinks */ - public List<TopicSink> initTopicSinks(Properties props) { - return TopicEndpoint.manager.addTopicSinks(props); + public List<TopicSink> getTopicSinks() { + return TopicEndpoint.manager.getTopicSinks(); } } |