summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java63
1 files changed, 14 insertions, 49 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index abb4da33..eb41f803 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import java.util.List;
-import java.util.Properties;
import org.onap.policy.drools.event.comm.FilterableTopicSource;
import org.onap.policy.drools.event.comm.TopicEndpoint;
import org.onap.policy.drools.event.comm.TopicListener;
@@ -31,7 +30,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Manages the internal DMaaP topic.
+ * Manages the internal DMaaP topic. Assumes all topics are managed by
+ * {@link TopicEndpoint#manager}.
*/
public class DmaapManager {
@@ -58,18 +58,6 @@ public class DmaapManager {
private final TopicSink topicSink;
/**
- * Topic sources. In theory, there's only one item in this list, the internal DMaaP
- * topic.
- */
- private final List<TopicSource> sources;
-
- /**
- * Topic sinks. In theory, there's only one item in this list, the internal DMaaP
- * topic.
- */
- private final List<TopicSink> sinks;
-
- /**
* {@code True} if the consumer is running, {@code false} otherwise.
*/
private boolean consuming = false;
@@ -83,17 +71,14 @@ public class DmaapManager {
* Constructs the manager, but does not start the source or sink.
*
* @param topic name of the internal DMaaP topic
- * @param props properties to configure the topic source & sink
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManager(String topic, Properties props) throws PoolingFeatureException {
+ public DmaapManager(String topic) throws PoolingFeatureException {
logger.info("initializing bus for topic {}", topic);
try {
this.topic = topic;
- this.sources = factory.initTopicSources(props);
- this.sinks = factory.initTopicSinks(props);
this.topicSource = findTopicSource();
this.topicSink = findTopicSink();
@@ -132,7 +117,7 @@ public class DmaapManager {
* @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
- for (TopicSource src : sources) {
+ for (TopicSource src : factory.getTopicSources()) {
if (topic.equals(src.getTopic())) {
if (src instanceof FilterableTopicSource) {
return (FilterableTopicSource) src;
@@ -153,7 +138,7 @@ public class DmaapManager {
* @throws PoolingFeatureException if the sink doesn't exist
*/
private TopicSink findTopicSink() throws PoolingFeatureException {
- for (TopicSink sink : sinks) {
+ for (TopicSink sink : factory.getTopicSinks()) {
if (topic.equals(sink.getTopic())) {
return sink;
}
@@ -164,22 +149,14 @@ public class DmaapManager {
/**
* Starts the publisher, if it isn't already running.
- *
- * @throws PoolingFeatureException if an error occurs
*/
- public void startPublisher() throws PoolingFeatureException {
+ public void startPublisher() {
if (publishing) {
return;
}
- try {
- logger.info("start publishing to topic {}", topic);
- topicSink.start();
- publishing = true;
-
- } catch (IllegalStateException e) {
- throw new PoolingFeatureException("cannot start topic sink " + topic, e);
- }
+ logger.info("start publishing to topic {}", topic);
+ publishing = true;
}
/**
@@ -205,14 +182,8 @@ public class DmaapManager {
Thread.currentThread().interrupt();
}
- try {
- logger.info("stop publishing to topic {}", topic);
- publishing = false;
- topicSink.stop();
-
- } catch (IllegalStateException e) {
- logger.error("cannot stop sink for topic {}", topic, e);
- }
+ logger.info("stop publishing to topic {}", topic);
+ publishing = false;
}
/**
@@ -288,23 +259,17 @@ public class DmaapManager {
public static class Factory {
/**
- * Initializes the topic sources.
- *
- * @param props properties used to configure the topics
* @return the topic sources
*/
- public List<TopicSource> initTopicSources(Properties props) {
- return TopicEndpoint.manager.addTopicSources(props);
+ public List<TopicSource> getTopicSources() {
+ return TopicEndpoint.manager.getTopicSources();
}
/**
- * Initializes the topic sinks.
- *
- * @param props properties used to configure the topics
* @return the topic sinks
*/
- public List<TopicSink> initTopicSinks(Properties props) {
- return TopicEndpoint.manager.addTopicSinks(props);
+ public List<TopicSink> getTopicSinks() {
+ return TopicEndpoint.manager.getTopicSinks();
}
}