summaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-plugins/src
diff options
context:
space:
mode:
authorLusheng Ji <lji@research.att.com>2018-08-16 13:50:57 +0000
committerGerrit Code Review <gerrit@onap.org>2018-08-16 13:50:57 +0000
commit53ec499d199d2ed8c50ab28554451fb532833275 (patch)
tree937a7b1530c213b9159f0db50630b983ac8780a6 /dcae-analytics-cdap-plugins/src
parentdc81b9204ecb1fabffb5ff863f69120d3f443d4c (diff)
parent2a85573c45ec6e8b20c78b0e408044e26d2ecf5d (diff)
Merge "Fix use try-with-resources sonar issue"
Diffstat (limited to 'dcae-analytics-cdap-plugins/src')
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java49
1 files changed, 26 insertions, 23 deletions
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java
index 1c291af..2d4b30e 100644
--- a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java
@@ -22,6 +22,7 @@ package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.metrics.Metrics;
+
import com.google.common.base.Optional;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
@@ -63,29 +64,31 @@ public class DMaaPMRReceiver extends Receiver<StructuredRecord> {
public void onStart() {
// create DMaaP MR Subscriber
- final DMaaPMRSubscriber subscriber =
- DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig));
-
- // Start a new thread with indefinite loop until receiver is stopped
- new Thread() {
- @Override
- public void run() {
- while (!isStopped()) {
- storeStructuredRecords(subscriber);
- try {
- final Integer pollingInterval = pluginConfig.getPollingInterval();
- LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval);
- TimeUnit.MILLISECONDS.sleep(pollingInterval);
- } catch (InterruptedException e) {
- final String errorMessage = String.format(
- "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e);
- Thread.currentThread().interrupt();
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
- }
- }
- }
- }.start();
-
+ try(final DMaaPMRSubscriber subscriber =
+ DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig))){
+
+ // Start a new thread with indefinite loop until receiver is stopped
+ new Thread() {
+ @Override
+ public void run() {
+ while (!isStopped()) {
+ storeStructuredRecords(subscriber);
+ try {
+ final Integer pollingInterval = pluginConfig.getPollingInterval();
+ LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval);
+ TimeUnit.MILLISECONDS.sleep(pollingInterval);
+ } catch (InterruptedException e) {
+ final String errorMessage = String.format(
+ "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e);
+ Thread.currentThread().interrupt();
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+ }
+ }.start();
+ } catch (Exception e) {
+ LOG.error("Exception in DMaaPMRReceiver onStart",e);
+ }
}
@Override