diff options
Diffstat (limited to 'dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java')
-rw-r--r-- | dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java | 134 |
1 files changed, 7 insertions, 127 deletions
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java index 2b416e7d..3fc769d3 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java @@ -2,8 +2,8 @@ * ============LICENSE_START======================================================= * openECOMP : SDN-C * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. + * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights + * reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,134 +21,14 @@ package org.onap.ccsdk.sli.northbound.dmaapclient; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; -import com.att.nsa.mr.client.response.MRConsumerResponse; -import java.io.File; -import java.io.FileInputStream; import java.util.Properties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public abstract class SdncDmaapConsumer implements Runnable { +public abstract interface SdncDmaapConsumer extends Runnable { + public abstract void init(Properties baseProperties, String consumerPropertiesPath); - private static final Logger LOG = LoggerFactory - .getLogger(SdncDmaapConsumer.class); + public abstract void processMsg(String msg) throws InvalidMessageException; - private Properties properties = null; - private MRConsumer consumer = null; - private MRConsumerResponse consumerResponse = null; - private boolean running = false; - private boolean ready = false; - private int fetchPause = 5000; // Default pause between fetch - 5 seconds - private int timeout = 15000; // Default timeout - 15 seconds + public abstract boolean isReady(); - public SdncDmaapConsumer() { - - } - - public SdncDmaapConsumer(Properties properties, String propertiesPath) { - init(properties, propertiesPath); - } - - public boolean isReady() { - return ready; - } - - public boolean isRunning() { - return running; - } - - public String getProperty(String name) { - return properties.getProperty(name, ""); - } - - public void init(Properties properties, String propertiesPath) { - - try (FileInputStream in = new FileInputStream(new File(propertiesPath))) { - - LOG.debug("propertiesPath: " + propertiesPath); - this.properties = (Properties) properties.clone(); - this.properties.load(in); - - - String timeoutStr = this.properties.getProperty("timeout"); - LOG.debug("timeoutStr: " + timeoutStr); - - if ((timeoutStr != null) && (timeoutStr.length() > 0)) { - timeout = parseTimeOutValue(timeoutStr); - } - - String fetchPauseStr = this.properties.getProperty("fetchPause"); - LOG.debug("fetchPause(Str): " + fetchPauseStr); - if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) { - fetchPause = parseFetchPause(fetchPauseStr); - } - LOG.debug("fetchPause: " + fetchPause); - - - this.consumer = MRClientFactory.createConsumer(propertiesPath); - ready = true; - } catch (Exception e) { - LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e); - } - } - - private int parseTimeOutValue(String timeoutStr) { - try { - return Integer.parseInt(timeoutStr); - } catch (NumberFormatException e) { - LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")"); - } - return timeout; - } - - private int parseFetchPause(String fetchPauseStr) { - try { - return Integer.parseInt(fetchPauseStr); - } catch (NumberFormatException e) { - LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")"); - } - return fetchPause; - } - - - @Override - public void run() { - if (ready) { - - running = true; - - while (running) { - - try { - boolean noData = true; - consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1); - for (String msg : consumerResponse.getActualMessages()) { - noData = false; - LOG.info("Received message from DMaaP:\n" + msg); - processMsg(msg); - } - - if (noData) { - pauseThread(); - } - } catch (Exception e) { - LOG.error("Caught exception reading from DMaaP", e); - running = false; - } - } - } - } - - private void pauseThread() throws InterruptedException { - if (fetchPause > 0) { - LOG.info(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause)); - Thread.sleep(fetchPause); - } else { - LOG.info("No data received from fetch. No fetch pause specified - retrying immediately"); - } - } - - abstract public void processMsg(String msg) throws InvalidMessageException; + public abstract boolean isRunning(); } |