diff options
author | 2018-03-04 14:53:33 +0200 | |
---|---|---|
committer | 2018-03-07 13:19:05 +0000 | |
commit | a5445100050e49e83f73424198d73cd72d672a4d (patch) | |
tree | cacf4df817df31be23e4e790d1dda857bdae061e /catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java | |
parent | 51157f92c21976cba4914c378aaa3cba49826931 (diff) |
Sync Integ to Master
Change-Id: I71e3acc26fa612127756ac04073a522b9cc6cd74
Issue-ID: SDC-977
Signed-off-by: Gitelman, Tal (tg851x) <tg851x@intl.att.com>
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java new file mode 100644 index 0000000000..e0661f4b22 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java @@ -0,0 +1,76 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import com.att.nsa.mr.client.MRConsumer; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DmaapConsumerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * Allows consuming DMAAP topic according to received consumer parameters + * Allows processing received messages. + */ +@Service +public class DmaapConsumer { + private final ExecutorFactory executorFactory; + private final DmaapClientFactory dmaapClientFactory; + private static final Logger logger = LoggerFactory.getLogger(DmaapClientFactory.class); + + @Autowired + private DmaapHealth dmaapHealth; + /** + * Allows to create an object of type DmaapConsumer + * @param executorFactory + * @param dmaapClientFactory + */ + @Autowired + public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory) { + this.executorFactory = executorFactory; + this.dmaapClientFactory = dmaapClientFactory; + } + + /** + * Allows consuming DMAAP topic according to received consumer parameters + * @param notificationReceived + * @param exceptionHandler + * @throws Exception + */ + public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception { + + DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration(); + String topic = dmaapConsumerParams.getTopic(); + logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams); + MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams); + ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client"); + ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler); + + pollExecutor.scheduleWithFixedDelay(() -> { + logger.info("Trying to fetch messages from topic: {}", topic); + boolean isTopicAvailable = false; + try { + Iterable<String> messages = consumer.fetch(); + isTopicAvailable = true ; + if (messages != null) { + for (String msg : messages) { + logger.info("The DMAAP message {} received. The topic is {}.", msg, topic); + notificationExecutor.execute(() -> notificationReceived.accept(msg)); + } + } + //successfully fetched + } + catch (Exception e) { + logger.error("The exception {} occured upon fetching DMAAP message", e); + } + dmaapHealth.report( isTopicAvailable ); + }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS); + } + +} |