diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java | 44 |
1 files changed, 30 insertions, 14 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index 98543f29..102eda75 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -58,14 +58,14 @@ public class DmaapManager { private final TopicSink topicSink; /** - * Topic sources. In theory, there's only one item in this list, the - * internal DMaaP topic. + * Topic sources. In theory, there's only one item in this list, the internal DMaaP + * topic. */ private final List<TopicSource> sources; /** - * Topic sinks. In theory, there's only one item in this list, the internal - * DMaaP topic. + * Topic sinks. In theory, there's only one item in this list, the internal DMaaP + * topic. */ private final List<TopicSink> sinks; @@ -112,8 +112,8 @@ public class DmaapManager { } /** - * Used by junit tests to set the factory used to create various objects - * used by this class. + * Used by junit tests to set the factory used to create various objects used by this + * class. * * @param factory the new factory */ @@ -129,8 +129,7 @@ public class DmaapManager { * Finds the topic source associated with the internal DMaaP topic. * * @return the topic source - * @throws PoolingFeatureException if the source doesn't exist or is not - * filterable + * @throws PoolingFeatureException if the source doesn't exist or is not filterable */ private FilterableTopicSource findTopicSource() throws PoolingFeatureException { for (TopicSource src : sources) { @@ -174,6 +173,7 @@ public class DmaapManager { } try { + logger.info("start publishing to topic {}", topic); topicSink.start(); publishing = true; @@ -184,13 +184,28 @@ public class DmaapManager { /** * Stops the publisher. + * + * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued + * messages and close */ - public void stopPublisher() { + public void stopPublisher(long waitMs) { if (!publishing) { return; } + /* + * Give the sink a chance to transmit messages in the queue. It would be better if + * "waitMs" could be passed to sink.stop(), but that isn't an option at this time. + */ + try { + Thread.sleep(waitMs); + + } catch (InterruptedException e) { + logger.warn("message transmission stopped due to {}", e.getMessage()); + } + try { + logger.info("stop publishing to topic {}", topic); publishing = false; topicSink.stop(); @@ -209,6 +224,7 @@ public class DmaapManager { return; } + logger.info("start consuming from topic {}", topic); topicSource.register(listener); consuming = true; } @@ -223,6 +239,7 @@ public class DmaapManager { return; } + logger.info("stop consuming from topic {}", topic); consuming = false; topicSource.unregister(listener); } @@ -230,16 +247,16 @@ public class DmaapManager { /** * Sets the server-side filter to be used by the consumer. * - * @param filter the filter string, or {@code null} if no filter is to be - * used + * @param filter the filter string, or {@code null} if no filter is to be used * @throws PoolingFeatureException if the topic is not filterable */ public void setFilter(String filter) throws PoolingFeatureException { try { + logger.debug("change filter for topic {} to {}", topic, filter); topicSource.setFilter(filter); } catch (UnsupportedOperationException e) { - throw new PoolingFeatureException("cannot filter topic " + topic); + throw new PoolingFeatureException("cannot filter topic " + topic, e); } } @@ -247,8 +264,7 @@ public class DmaapManager { * Publishes a message to the sink. * * @param msg message to be published - * @throws PoolingFeatureException if an error occurs or the publisher isn't - * running + * @throws PoolingFeatureException if an error occurs or the publisher isn't running */ public void publish(String msg) throws PoolingFeatureException { if (!publishing) { |