diff options
Diffstat (limited to 'dcae-analytics-cdap-plugins')
2 files changed, 27 insertions, 24 deletions
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java index 1c291af..2d4b30e 100644 --- a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java +++ b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java @@ -22,6 +22,7 @@ package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap; import co.cask.cdap.api.data.format.StructuredRecord; import co.cask.cdap.api.metrics.Metrics; + import com.google.common.base.Optional; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; @@ -63,29 +64,31 @@ public class DMaaPMRReceiver extends Receiver<StructuredRecord> { public void onStart() { // create DMaaP MR Subscriber - final DMaaPMRSubscriber subscriber = - DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig)); - - // Start a new thread with indefinite loop until receiver is stopped - new Thread() { - @Override - public void run() { - while (!isStopped()) { - storeStructuredRecords(subscriber); - try { - final Integer pollingInterval = pluginConfig.getPollingInterval(); - LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval); - TimeUnit.MILLISECONDS.sleep(pollingInterval); - } catch (InterruptedException e) { - final String errorMessage = String.format( - "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e); - Thread.currentThread().interrupt(); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - } - }.start(); - + try(final DMaaPMRSubscriber subscriber = + DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig))){ + + // Start a new thread with indefinite loop until receiver is stopped + new Thread() { + @Override + public void run() { + while (!isStopped()) { + storeStructuredRecords(subscriber); + try { + final Integer pollingInterval = pluginConfig.getPollingInterval(); + LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval); + TimeUnit.MILLISECONDS.sleep(pollingInterval); + } catch (InterruptedException e) { + final String errorMessage = String.format( + "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e); + Thread.currentThread().interrupt(); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + } + }.start(); + } catch (Exception e) { + LOG.error("Exception in DMaaPMRReceiver onStart",e); + } } @Override diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java index 5f622cd..657f0af 100644 --- a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java +++ b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java @@ -53,7 +53,7 @@ public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils { public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() { @Override - public Schema.Type apply(@Nonnull Schema schema) { + public Schema.Type apply(Schema schema) { return schema.getType(); } }; |