diff options
author | Lusheng Ji <lji@research.att.com> | 2018-08-16 13:50:57 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-08-16 13:50:57 +0000 |
commit | 53ec499d199d2ed8c50ab28554451fb532833275 (patch) | |
tree | 937a7b1530c213b9159f0db50630b983ac8780a6 /dcae-analytics-cdap-plugins/src | |
parent | dc81b9204ecb1fabffb5ff863f69120d3f443d4c (diff) | |
parent | 2a85573c45ec6e8b20c78b0e408044e26d2ecf5d (diff) |
Merge "Fix use try-with-resources sonar issue"
Diffstat (limited to 'dcae-analytics-cdap-plugins/src')
-rw-r--r-- | dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java | 49 |
1 files changed, 26 insertions, 23 deletions
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java index 1c291af..2d4b30e 100644 --- a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java +++ b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java @@ -22,6 +22,7 @@ package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap; import co.cask.cdap.api.data.format.StructuredRecord; import co.cask.cdap.api.metrics.Metrics; + import com.google.common.base.Optional; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; @@ -63,29 +64,31 @@ public class DMaaPMRReceiver extends Receiver<StructuredRecord> { public void onStart() { // create DMaaP MR Subscriber - final DMaaPMRSubscriber subscriber = - DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig)); - - // Start a new thread with indefinite loop until receiver is stopped - new Thread() { - @Override - public void run() { - while (!isStopped()) { - storeStructuredRecords(subscriber); - try { - final Integer pollingInterval = pluginConfig.getPollingInterval(); - LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval); - TimeUnit.MILLISECONDS.sleep(pollingInterval); - } catch (InterruptedException e) { - final String errorMessage = String.format( - "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e); - Thread.currentThread().interrupt(); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - } - }.start(); - + try(final DMaaPMRSubscriber subscriber = + DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig))){ + + // Start a new thread with indefinite loop until receiver is stopped + new Thread() { + @Override + public void run() { + while (!isStopped()) { + storeStructuredRecords(subscriber); + try { + final Integer pollingInterval = pluginConfig.getPollingInterval(); + LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval); + TimeUnit.MILLISECONDS.sleep(pollingInterval); + } catch (InterruptedException e) { + final String errorMessage = String.format( + "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e); + Thread.currentThread().interrupt(); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + } + }.start(); + } catch (Exception e) { + LOG.error("Exception in DMaaPMRReceiver onStart",e); + } } @Override |