aboutsummaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java
diff options
context:
space:
mode:
authorMichael Lando <ml636r@att.com>2018-03-04 14:53:33 +0200
committerMichael Lando <ml636r@att.com>2018-03-07 13:19:05 +0000
commita5445100050e49e83f73424198d73cd72d672a4d (patch)
treecacf4df817df31be23e4e790d1dda857bdae061e /catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java
parent51157f92c21976cba4914c378aaa3cba49826931 (diff)
Sync Integ to Master
Change-Id: I71e3acc26fa612127756ac04073a522b9cc6cd74 Issue-ID: SDC-977 Signed-off-by: Gitelman, Tal (tg851x) <tg851x@intl.att.com>
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java76
1 files changed, 76 insertions, 0 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java
new file mode 100644
index 0000000000..e0661f4b22
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java
@@ -0,0 +1,76 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import com.att.nsa.mr.client.MRConsumer;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * Allows consuming DMAAP topic according to received consumer parameters
+ * Allows processing received messages.
+ */
+@Service
+public class DmaapConsumer {
+ private final ExecutorFactory executorFactory;
+ private final DmaapClientFactory dmaapClientFactory;
+ private static final Logger logger = LoggerFactory.getLogger(DmaapClientFactory.class);
+
+ @Autowired
+ private DmaapHealth dmaapHealth;
+ /**
+ * Allows to create an object of type DmaapConsumer
+ * @param executorFactory
+ * @param dmaapClientFactory
+ */
+ @Autowired
+ public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory) {
+ this.executorFactory = executorFactory;
+ this.dmaapClientFactory = dmaapClientFactory;
+ }
+
+ /**
+ * Allows consuming DMAAP topic according to received consumer parameters
+ * @param notificationReceived
+ * @param exceptionHandler
+ * @throws Exception
+ */
+ public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception {
+
+ DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
+ String topic = dmaapConsumerParams.getTopic();
+ logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams);
+ MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams);
+ ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client");
+ ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler);
+
+ pollExecutor.scheduleWithFixedDelay(() -> {
+ logger.info("Trying to fetch messages from topic: {}", topic);
+ boolean isTopicAvailable = false;
+ try {
+ Iterable<String> messages = consumer.fetch();
+ isTopicAvailable = true ;
+ if (messages != null) {
+ for (String msg : messages) {
+ logger.info("The DMAAP message {} received. The topic is {}.", msg, topic);
+ notificationExecutor.execute(() -> notificationReceived.accept(msg));
+ }
+ }
+ //successfully fetched
+ }
+ catch (Exception e) {
+ logger.error("The exception {} occured upon fetching DMAAP message", e);
+ }
+ dmaapHealth.report( isTopicAvailable );
+ }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS);
+ }
+
+}