summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java63
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java67
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java44
3 files changed, 64 insertions, 110 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index abb4da33..eb41f803 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import java.util.List;
-import java.util.Properties;
import org.onap.policy.drools.event.comm.FilterableTopicSource;
import org.onap.policy.drools.event.comm.TopicEndpoint;
import org.onap.policy.drools.event.comm.TopicListener;
@@ -31,7 +30,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Manages the internal DMaaP topic.
+ * Manages the internal DMaaP topic. Assumes all topics are managed by
+ * {@link TopicEndpoint#manager}.
*/
public class DmaapManager {
@@ -58,18 +58,6 @@ public class DmaapManager {
private final TopicSink topicSink;
/**
- * Topic sources. In theory, there's only one item in this list, the internal DMaaP
- * topic.
- */
- private final List<TopicSource> sources;
-
- /**
- * Topic sinks. In theory, there's only one item in this list, the internal DMaaP
- * topic.
- */
- private final List<TopicSink> sinks;
-
- /**
* {@code True} if the consumer is running, {@code false} otherwise.
*/
private boolean consuming = false;
@@ -83,17 +71,14 @@ public class DmaapManager {
* Constructs the manager, but does not start the source or sink.
*
* @param topic name of the internal DMaaP topic
- * @param props properties to configure the topic source & sink
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManager(String topic, Properties props) throws PoolingFeatureException {
+ public DmaapManager(String topic) throws PoolingFeatureException {
logger.info("initializing bus for topic {}", topic);
try {
this.topic = topic;
- this.sources = factory.initTopicSources(props);
- this.sinks = factory.initTopicSinks(props);
this.topicSource = findTopicSource();
this.topicSink = findTopicSink();
@@ -132,7 +117,7 @@ public class DmaapManager {
* @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
- for (TopicSource src : sources) {
+ for (TopicSource src : factory.getTopicSources()) {
if (topic.equals(src.getTopic())) {
if (src instanceof FilterableTopicSource) {
return (FilterableTopicSource) src;
@@ -153,7 +138,7 @@ public class DmaapManager {
* @throws PoolingFeatureException if the sink doesn't exist
*/
private TopicSink findTopicSink() throws PoolingFeatureException {
- for (TopicSink sink : sinks) {
+ for (TopicSink sink : factory.getTopicSinks()) {
if (topic.equals(sink.getTopic())) {
return sink;
}
@@ -164,22 +149,14 @@ public class DmaapManager {
/**
* Starts the publisher, if it isn't already running.
- *
- * @throws PoolingFeatureException if an error occurs
*/
- public void startPublisher() throws PoolingFeatureException {
+ public void startPublisher() {
if (publishing) {
return;
}
- try {
- logger.info("start publishing to topic {}", topic);
- topicSink.start();
- publishing = true;
-
- } catch (IllegalStateException e) {
- throw new PoolingFeatureException("cannot start topic sink " + topic, e);
- }
+ logger.info("start publishing to topic {}", topic);
+ publishing = true;
}
/**
@@ -205,14 +182,8 @@ public class DmaapManager {
Thread.currentThread().interrupt();
}
- try {
- logger.info("stop publishing to topic {}", topic);
- publishing = false;
- topicSink.stop();
-
- } catch (IllegalStateException e) {
- logger.error("cannot stop sink for topic {}", topic, e);
- }
+ logger.info("stop publishing to topic {}", topic);
+ publishing = false;
}
/**
@@ -288,23 +259,17 @@ public class DmaapManager {
public static class Factory {
/**
- * Initializes the topic sources.
- *
- * @param props properties used to configure the topics
* @return the topic sources
*/
- public List<TopicSource> initTopicSources(Properties props) {
- return TopicEndpoint.manager.addTopicSources(props);
+ public List<TopicSource> getTopicSources() {
+ return TopicEndpoint.manager.getTopicSources();
}
/**
- * Initializes the topic sinks.
- *
- * @param props properties used to configure the topics
* @return the topic sinks
*/
- public List<TopicSink> initTopicSinks(Properties props) {
- return TopicEndpoint.manager.addTopicSinks(props);
+ public List<TopicSink> getTopicSinks() {
+ return TopicEndpoint.manager.getTopicSinks();
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 1e2071ab..67cb21ee 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -20,13 +20,16 @@
package org.onap.policy.drools.pooling;
+import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.function.Function;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.TopicEndpoint;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
@@ -53,7 +56,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
* Factory used to create objects.
*/
- private static Factory factory;
+ private static Factory factory = new Factory();
/**
* ID of this host.
@@ -124,6 +127,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
public boolean beforeStart(PolicyEngine engine) {
logger.info("initializing " + PoolingProperties.FEATURE_NAME);
featProps = factory.getProperties(PoolingProperties.FEATURE_NAME);
+
+ factory.initTopicSources(featProps);
+ factory.initTopicSinks(featProps);
+
return false;
}
@@ -189,17 +196,25 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
@Override
public boolean afterStop(PolicyController controller) {
-
- // NOTE: using doDeleteManager() instead of doManager()
-
- return doDeleteManager(controller, mgr -> {
-
+ return doManager(controller, mgr -> {
mgr.afterStop();
return false;
});
}
@Override
+ public boolean afterShutdown(PolicyController controller) {
+ deleteManager(controller);
+ return false;
+ }
+
+ @Override
+ public boolean afterHalt(PolicyController controller) {
+ deleteManager(controller);
+ return false;
+ }
+
+ @Override
public boolean beforeLock(PolicyController controller) {
return doManager(controller, mgr -> {
mgr.beforeLock();
@@ -306,29 +321,17 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
}
/**
- * Executes a function using the manager associated with the controller and then
- * deletes the manager. Catches any exceptions from the function and re-throws it as a
- * runtime exception.
+ * Deletes the manager associated with a controller.
*
* @param controller
- * @param func function to be executed
- * @return {@code true} if the function handled the request, {@code false} otherwise
* @throws PoolingFeatureRtException if an error occurs
*/
- private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) {
+ private void deleteManager(PolicyController controller) {
String name = controller.getName();
logger.info("remove feature-pool-dmaap manager for {}", name);
- // NOTE: using "remove()" instead of "get()"
-
- PoolingManagerImpl mgr = ctlr2pool.remove(name);
-
- if (mgr == null) {
- return false;
- }
-
- return func.apply(mgr);
+ ctlr2pool.remove(name);
}
/**
@@ -416,5 +419,25 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
public PolicyController getController(DroolsController droolsController) {
return PolicyController.factory.get(droolsController);
}
+
+ /**
+ * Initializes the topic sources.
+ *
+ * @param props properties used to configure the topics
+ * @return the topic sources
+ */
+ public List<TopicSource> initTopicSources(Properties props) {
+ return TopicEndpoint.manager.addTopicSources(props);
+ }
+
+ /**
+ * Initializes the topic sinks.
+ *
+ * @param props properties used to configure the topics
+ * @return the topic sinks
+ */
+ public List<TopicSink> initTopicSinks(Properties props) {
+ return TopicEndpoint.manager.addTopicSinks(props);
+ }
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index de25e471..33f45085 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -45,7 +45,6 @@ import org.onap.policy.drools.pooling.state.QueryState;
import org.onap.policy.drools.pooling.state.StartState;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
-import org.onap.policy.drools.properties.PolicyProperties;
import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
import org.onap.policy.drools.system.PolicyController;
import org.slf4j.Logger;
@@ -177,8 +176,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.topic = props.getPoolingTopic();
this.eventq = factory.makeEventQueue(props);
this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
- this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(),
- makeDmaapProps(controller, props.getSource()));
+ this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
@@ -237,39 +235,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes properties for configuring DMaaP. Copies properties from the source that
- * start with the Pooling property prefix followed by the controller name, stripping
- * the prefix and controller name.
- *
- * @param controller the controller for which DMaaP will be configured
- * @param source properties from which to get the DMaaP properties
- * @return DMaaP properties
- */
- private Properties makeDmaapProps(PolicyController controller, Properties source) {
- SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source);
-
- // could be UEB or DMAAP, so add both
- addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
- addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
-
- return specProps;
- }
-
- /**
- * Adds DMaaP consumer properties, consumer group & instance. The group is the host
- * and the instance is a constant.
- *
- * @param props where to add the new properties
- * @param prefix property prefix
- */
- private void addDmaapConsumerProps(SpecProperties props, String prefix) {
- String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
-
- props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
- props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
- }
-
- /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*
@@ -325,6 +290,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
dmaapMgr.stopConsumer(this);
publishAdmin(new Offline(getHost()));
}
+
+ assignments = null;
}
if (sched != null) {
@@ -902,12 +869,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* Creates a DMaaP manager.
*
* @param topic name of the internal DMaaP topic
- * @param props properties used to configure DMaaP
* @return a new DMaaP manager
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException {
- return new DmaapManager(topic, props);
+ public DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
+ return new DmaapManager(topic);
}
/**