aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2018-05-03 14:47:23 -0400
committerJim Hahn <jrh3@att.com>2018-05-04 10:20:48 -0400
commit3b6bb8e880d3b56afb7767ea7d0505ceb49f8890 (patch)
tree40147927cb0b2a2da59680aed3999257e3e6498c /feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
parentd8941a17b30fdc6d24f1c5e9d84685c1fafd7ee6 (diff)
Fix various problems in pooling
Renamed META-INF SessionAPI to EngineAPI, as it implements the latter. Created default PoolingFeature.factory object. Don't delete a controller's pooling manager when stop is called; do that in afterHalt and afterShutdown. This enables it to be restarted as long as the controller still exists. Only stop & start the internal DMaaP topic at the engine level instead of the controller level. This is necessary to prevent sinks for ALL controllers from being started each time an individual controller starts. Clear all bucket assignments when controller is stopped. Mark test methods with @Override annotation. Add default property file for pooling feature. Add license to default property file. Remove tests for doDeleteManager(), as it no longer exists. Changed " = " to "=" in the property file. Change-Id: I80c0c3f1879b5a320044db93e3dfa3b7281cda51 Issue-ID: POLICY-774 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java63
1 files changed, 14 insertions, 49 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index abb4da33..eb41f803 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import java.util.List;
-import java.util.Properties;
import org.onap.policy.drools.event.comm.FilterableTopicSource;
import org.onap.policy.drools.event.comm.TopicEndpoint;
import org.onap.policy.drools.event.comm.TopicListener;
@@ -31,7 +30,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Manages the internal DMaaP topic.
+ * Manages the internal DMaaP topic. Assumes all topics are managed by
+ * {@link TopicEndpoint#manager}.
*/
public class DmaapManager {
@@ -58,18 +58,6 @@ public class DmaapManager {
private final TopicSink topicSink;
/**
- * Topic sources. In theory, there's only one item in this list, the internal DMaaP
- * topic.
- */
- private final List<TopicSource> sources;
-
- /**
- * Topic sinks. In theory, there's only one item in this list, the internal DMaaP
- * topic.
- */
- private final List<TopicSink> sinks;
-
- /**
* {@code True} if the consumer is running, {@code false} otherwise.
*/
private boolean consuming = false;
@@ -83,17 +71,14 @@ public class DmaapManager {
* Constructs the manager, but does not start the source or sink.
*
* @param topic name of the internal DMaaP topic
- * @param props properties to configure the topic source & sink
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManager(String topic, Properties props) throws PoolingFeatureException {
+ public DmaapManager(String topic) throws PoolingFeatureException {
logger.info("initializing bus for topic {}", topic);
try {
this.topic = topic;
- this.sources = factory.initTopicSources(props);
- this.sinks = factory.initTopicSinks(props);
this.topicSource = findTopicSource();
this.topicSink = findTopicSink();
@@ -132,7 +117,7 @@ public class DmaapManager {
* @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
- for (TopicSource src : sources) {
+ for (TopicSource src : factory.getTopicSources()) {
if (topic.equals(src.getTopic())) {
if (src instanceof FilterableTopicSource) {
return (FilterableTopicSource) src;
@@ -153,7 +138,7 @@ public class DmaapManager {
* @throws PoolingFeatureException if the sink doesn't exist
*/
private TopicSink findTopicSink() throws PoolingFeatureException {
- for (TopicSink sink : sinks) {
+ for (TopicSink sink : factory.getTopicSinks()) {
if (topic.equals(sink.getTopic())) {
return sink;
}
@@ -164,22 +149,14 @@ public class DmaapManager {
/**
* Starts the publisher, if it isn't already running.
- *
- * @throws PoolingFeatureException if an error occurs
*/
- public void startPublisher() throws PoolingFeatureException {
+ public void startPublisher() {
if (publishing) {
return;
}
- try {
- logger.info("start publishing to topic {}", topic);
- topicSink.start();
- publishing = true;
-
- } catch (IllegalStateException e) {
- throw new PoolingFeatureException("cannot start topic sink " + topic, e);
- }
+ logger.info("start publishing to topic {}", topic);
+ publishing = true;
}
/**
@@ -205,14 +182,8 @@ public class DmaapManager {
Thread.currentThread().interrupt();
}
- try {
- logger.info("stop publishing to topic {}", topic);
- publishing = false;
- topicSink.stop();
-
- } catch (IllegalStateException e) {
- logger.error("cannot stop sink for topic {}", topic, e);
- }
+ logger.info("stop publishing to topic {}", topic);
+ publishing = false;
}
/**
@@ -288,23 +259,17 @@ public class DmaapManager {
public static class Factory {
/**
- * Initializes the topic sources.
- *
- * @param props properties used to configure the topics
* @return the topic sources
*/
- public List<TopicSource> initTopicSources(Properties props) {
- return TopicEndpoint.manager.addTopicSources(props);
+ public List<TopicSource> getTopicSources() {
+ return TopicEndpoint.manager.getTopicSources();
}
/**
- * Initializes the topic sinks.
- *
- * @param props properties used to configure the topics
* @return the topic sinks
*/
- public List<TopicSink> initTopicSinks(Properties props) {
- return TopicEndpoint.manager.addTopicSinks(props);
+ public List<TopicSink> getTopicSinks() {
+ return TopicEndpoint.manager.getTopicSinks();
}
}