aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java44
1 files changed, 30 insertions, 14 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 98543f29..102eda75 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -58,14 +58,14 @@ public class DmaapManager {
private final TopicSink topicSink;
/**
- * Topic sources. In theory, there's only one item in this list, the
- * internal DMaaP topic.
+ * Topic sources. In theory, there's only one item in this list, the internal DMaaP
+ * topic.
*/
private final List<TopicSource> sources;
/**
- * Topic sinks. In theory, there's only one item in this list, the internal
- * DMaaP topic.
+ * Topic sinks. In theory, there's only one item in this list, the internal DMaaP
+ * topic.
*/
private final List<TopicSink> sinks;
@@ -112,8 +112,8 @@ public class DmaapManager {
}
/**
- * Used by junit tests to set the factory used to create various objects
- * used by this class.
+ * Used by junit tests to set the factory used to create various objects used by this
+ * class.
*
* @param factory the new factory
*/
@@ -129,8 +129,7 @@ public class DmaapManager {
* Finds the topic source associated with the internal DMaaP topic.
*
* @return the topic source
- * @throws PoolingFeatureException if the source doesn't exist or is not
- * filterable
+ * @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
for (TopicSource src : sources) {
@@ -174,6 +173,7 @@ public class DmaapManager {
}
try {
+ logger.info("start publishing to topic {}", topic);
topicSink.start();
publishing = true;
@@ -184,13 +184,28 @@ public class DmaapManager {
/**
* Stops the publisher.
+ *
+ * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued
+ * messages and close
*/
- public void stopPublisher() {
+ public void stopPublisher(long waitMs) {
if (!publishing) {
return;
}
+ /*
+ * Give the sink a chance to transmit messages in the queue. It would be better if
+ * "waitMs" could be passed to sink.stop(), but that isn't an option at this time.
+ */
+ try {
+ Thread.sleep(waitMs);
+
+ } catch (InterruptedException e) {
+ logger.warn("message transmission stopped due to {}", e.getMessage());
+ }
+
try {
+ logger.info("stop publishing to topic {}", topic);
publishing = false;
topicSink.stop();
@@ -209,6 +224,7 @@ public class DmaapManager {
return;
}
+ logger.info("start consuming from topic {}", topic);
topicSource.register(listener);
consuming = true;
}
@@ -223,6 +239,7 @@ public class DmaapManager {
return;
}
+ logger.info("stop consuming from topic {}", topic);
consuming = false;
topicSource.unregister(listener);
}
@@ -230,16 +247,16 @@ public class DmaapManager {
/**
* Sets the server-side filter to be used by the consumer.
*
- * @param filter the filter string, or {@code null} if no filter is to be
- * used
+ * @param filter the filter string, or {@code null} if no filter is to be used
* @throws PoolingFeatureException if the topic is not filterable
*/
public void setFilter(String filter) throws PoolingFeatureException {
try {
+ logger.debug("change filter for topic {} to {}", topic, filter);
topicSource.setFilter(filter);
} catch (UnsupportedOperationException e) {
- throw new PoolingFeatureException("cannot filter topic " + topic);
+ throw new PoolingFeatureException("cannot filter topic " + topic, e);
}
}
@@ -247,8 +264,7 @@ public class DmaapManager {
* Publishes a message to the sink.
*
* @param msg message to be published
- * @throws PoolingFeatureException if an error occurs or the publisher isn't
- * running
+ * @throws PoolingFeatureException if an error occurs or the publisher isn't running
*/
public void publish(String msg) throws PoolingFeatureException {
if (!publishing) {