aboutsummaryrefslogtreecommitdiffstats
path: root/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java')
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java134
1 files changed, 7 insertions, 127 deletions
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
index 2b416e7d..3fc769d3 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
@@ -2,8 +2,8 @@
* ============LICENSE_START=======================================================
* openECOMP : SDN-C
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
+ * reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,134 +21,14 @@
package org.onap.ccsdk.sli.northbound.dmaapclient;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-import java.io.File;
-import java.io.FileInputStream;
import java.util.Properties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public abstract class SdncDmaapConsumer implements Runnable {
+public abstract interface SdncDmaapConsumer extends Runnable {
+ public abstract void init(Properties baseProperties, String consumerPropertiesPath);
- private static final Logger LOG = LoggerFactory
- .getLogger(SdncDmaapConsumer.class);
+ public abstract void processMsg(String msg) throws InvalidMessageException;
- private Properties properties = null;
- private MRConsumer consumer = null;
- private MRConsumerResponse consumerResponse = null;
- private boolean running = false;
- private boolean ready = false;
- private int fetchPause = 5000; // Default pause between fetch - 5 seconds
- private int timeout = 15000; // Default timeout - 15 seconds
+ public abstract boolean isReady();
- public SdncDmaapConsumer() {
-
- }
-
- public SdncDmaapConsumer(Properties properties, String propertiesPath) {
- init(properties, propertiesPath);
- }
-
- public boolean isReady() {
- return ready;
- }
-
- public boolean isRunning() {
- return running;
- }
-
- public String getProperty(String name) {
- return properties.getProperty(name, "");
- }
-
- public void init(Properties properties, String propertiesPath) {
-
- try (FileInputStream in = new FileInputStream(new File(propertiesPath))) {
-
- LOG.debug("propertiesPath: " + propertiesPath);
- this.properties = (Properties) properties.clone();
- this.properties.load(in);
-
-
- String timeoutStr = this.properties.getProperty("timeout");
- LOG.debug("timeoutStr: " + timeoutStr);
-
- if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
- timeout = parseTimeOutValue(timeoutStr);
- }
-
- String fetchPauseStr = this.properties.getProperty("fetchPause");
- LOG.debug("fetchPause(Str): " + fetchPauseStr);
- if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
- fetchPause = parseFetchPause(fetchPauseStr);
- }
- LOG.debug("fetchPause: " + fetchPause);
-
-
- this.consumer = MRClientFactory.createConsumer(propertiesPath);
- ready = true;
- } catch (Exception e) {
- LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e);
- }
- }
-
- private int parseTimeOutValue(String timeoutStr) {
- try {
- return Integer.parseInt(timeoutStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
- }
- return timeout;
- }
-
- private int parseFetchPause(String fetchPauseStr) {
- try {
- return Integer.parseInt(fetchPauseStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
- }
- return fetchPause;
- }
-
-
- @Override
- public void run() {
- if (ready) {
-
- running = true;
-
- while (running) {
-
- try {
- boolean noData = true;
- consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
- for (String msg : consumerResponse.getActualMessages()) {
- noData = false;
- LOG.info("Received message from DMaaP:\n" + msg);
- processMsg(msg);
- }
-
- if (noData) {
- pauseThread();
- }
- } catch (Exception e) {
- LOG.error("Caught exception reading from DMaaP", e);
- running = false;
- }
- }
- }
- }
-
- private void pauseThread() throws InterruptedException {
- if (fetchPause > 0) {
- LOG.info(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
- Thread.sleep(fetchPause);
- } else {
- LOG.info("No data received from fetch. No fetch pause specified - retrying immediately");
- }
- }
-
- abstract public void processMsg(String msg) throws InvalidMessageException;
+ public abstract boolean isRunning();
}