summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSmokowski, Kevin (ks6305) <ks6305@att.com>2018-06-05 15:57:12 +0000
committerSmokowski, Kevin (ks6305) <ks6305@att.com>2018-06-05 15:57:12 +0000
commitf1b7fecfbe875059793a6ece34b3743f379bb030 (patch)
tree1404a6e3bb2460d2b842929c91d08152c3352868
parent0f3fdfae6226b86ffc49e39b365eb149fce0e35f (diff)
Introduce new dmaapClient for use with ccsdk
Create an interface and turn existing class into impl file. Write new client which implements the new interface Change-Id: I127bef09bd7f07556a18b753e428e3cd75f5f498 Issue-ID: CCSDK-294 Signed-off-by: Smokowski, Kevin (ks6305) <ks6305@att.com>
-rwxr-xr-xdmaap-listener/pom.xml15
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java2
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java194
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java8
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java7
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java134
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java159
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java8
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java2
9 files changed, 383 insertions, 146 deletions
diff --git a/dmaap-listener/pom.xml b/dmaap-listener/pom.xml
index 6e8b2c05..85f3028f 100755
--- a/dmaap-listener/pom.xml
+++ b/dmaap-listener/pom.xml
@@ -30,6 +30,17 @@
<SWM_VERSION>${project.version}-${build.number}</SWM_VERSION>
</properties>
+ <dependencyManagement>
+ <dependencies>
+ <!-- dmaapClient needs this version of this jar -->
+ <dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>javax.ws.rs-api</artifactId>
+ <version>2.1</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
@@ -105,8 +116,8 @@
<version>2.5.1</version>
<inherited>true</inherited>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.8</source>
+ <target>1.8</target>
</configuration>
</plugin>
<plugin>
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java
index a8336342..57fcd880 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java
@@ -24,7 +24,7 @@ package org.onap.ccsdk.sli.northbound.dmaapclient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DummyDmaapConsumer extends SdncDmaapConsumer {
+public class DummyDmaapConsumer extends SdncDmaapConsumerImpl {
private static final Logger LOG = LoggerFactory
.getLogger(DummyDmaapConsumer.class);
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java
new file mode 100644
index 00000000..234a2026
--- /dev/null
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java
@@ -0,0 +1,194 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.Invocation.Builder;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * jax-rs based client to build message router consumers
+ */
+public class MessageRouterHttpClient implements SdncDmaapConsumer {
+ private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClient.class);
+
+ protected Boolean isReady = false;
+ protected Boolean isRunning = false;
+ protected Client client;
+ protected URI uri;
+ protected Invocation getMessages;
+ protected Integer fetchPause;
+ protected Properties properties;
+ protected final String DEFAULT_CONNECT_TIMEOUT_SECONDS = "30";
+ protected final String DEFAULT_READ_TIMEOUT_MINUTES = "3";
+ protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
+ protected final String DEFAULT_LIMIT = null;
+
+ public MessageRouterHttpClient() {
+
+ }
+
+ @Override
+ public void run() {
+ if (isReady) {
+ isRunning = true;
+ while (isRunning) {
+ try {
+ Response response = getMessages.invoke();
+ Log.info("GET " + uri + " returned http status " + response.getStatus());
+ String entity = response.readEntity(String.class);
+ if (entity.contains("{")) {
+ // Get rid of opening ["
+ entity = entity.substring(2);
+ // Get rid of closing "]
+ entity = entity.substring(0, entity.length() - 2);
+ // This replacement effectively un-escapes the JSON
+ for (String message : entity.split("\",\"")) {
+ try {
+ processMsg(message.replace("\\\"", "\""));
+ } catch (InvalidMessageException e) {
+ Log.error("Message could not be processed", e);
+ }
+ }
+ } else {
+ Log.info("Entity doesn't appear to contain JSON elements");
+ }
+ } catch (Exception e) {
+ Log.error("GET " + uri + " failed.", e);
+ } finally {
+ Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again.");
+ try {
+ Thread.sleep(fetchPause);
+ } catch (InterruptedException e) {
+ Log.error("Could not sleep thread", e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void init(Properties baseProperties, String consumerPropertiesPath) {
+ try {
+ baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
+ this.properties = baseProperties;
+ String username = baseProperties.getProperty("username");
+ String password = baseProperties.getProperty("password");
+ String topic = baseProperties.getProperty("topic");
+ String group = baseProperties.getProperty("group");
+ String host = baseProperties.getProperty("host");
+ String id = baseProperties.getProperty("id");
+
+ String filter = baseProperties.getProperty("filter");
+ if (filter != null) {
+ if (filter.length() > 0) {
+ filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
+ } else {
+ filter = null;
+ }
+ }
+
+ String limitString = baseProperties.getProperty("limit", DEFAULT_LIMIT);
+ Integer limit = null;
+ if (limitString != null && limitString.length() > 0) {
+ limit = Integer.valueOf(limitString);
+ }
+
+ Integer timeoutQueryParamValue =
+ Integer.valueOf(baseProperties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
+ Integer connectTimeoutSeconds = Integer
+ .valueOf(baseProperties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT_SECONDS));
+ Integer readTimeoutMinutes =
+ Integer.valueOf(baseProperties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT_MINUTES));
+
+ String authorizationString = buildAuthorizationString(username, password);
+ this.uri = buildUri(topic, group, id, host, timeoutQueryParamValue, limit, filter);
+ this.client = getClient(connectTimeoutSeconds, readTimeoutMinutes);
+ Builder builder =
+ client.target(uri).request("application/json").header("Authorization", authorizationString);
+ this.getMessages = builder.buildGet();
+ this.fetchPause = Integer.valueOf(baseProperties.getProperty("fetchPause"));
+ this.isReady = true;
+ } catch (FileNotFoundException e) {
+ Log.error("FileNotFoundException while reading consumer properties", e);
+ } catch (IOException e) {
+ Log.error("IOException while reading consumer properties", e);
+ }
+ }
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+ System.out.println(msg);
+ }
+
+ @Override
+ public boolean isReady() {
+ return isReady;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ protected String buildAuthorizationString(String userName, String password) {
+ String basicAuthString = userName + ":" + password;
+ basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
+ return "Basic " + basicAuthString;
+ }
+
+ protected Client getClient(Integer connectTimeoutSeconds, Integer readTimeoutMinutes) {
+ ClientBuilder clientBuilder = ClientBuilder.newBuilder();
+ clientBuilder.connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS);
+ clientBuilder.readTimeout(readTimeoutMinutes, TimeUnit.MINUTES);
+ return clientBuilder.build();
+ }
+
+ protected URI buildUri(String topic, String consumerGroup, String consumerId, String host, Integer timeout,
+ Integer limit, String filter) {
+ UriBuilder builder = UriBuilder.fromPath("http://" + host + "/events/{topic}/{consumerGroup}/{consumderId}");
+ builder.queryParam("timeout", timeout);
+ if (limit != null) {
+ builder.queryParam("limit", limit);
+ }
+ if (filter != null) {
+ builder.queryParam("filter", filter);
+ }
+ return builder.build(topic, consumerGroup, consumerId);
+ }
+
+}
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java
index 0e12dfa2..2c4de710 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java
@@ -21,9 +21,6 @@
package org.onap.ccsdk.sli.northbound.dmaapclient;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -34,14 +31,15 @@ import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.VelocityEngine;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
-public class SdncAaiDmaapConsumer extends SdncDmaapConsumer {
+public class SdncAaiDmaapConsumer extends SdncDmaapConsumerImpl {
private static final Logger LOG = LoggerFactory.getLogger(SdncAaiDmaapConsumer.class);
private static final String SDNC_ENDPOINT = "SDNC.endpoint";
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java
index 03560d30..7b68ceb6 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java
@@ -5,19 +5,14 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.Map;
import java.util.Properties;
-
import org.onap.ccsdk.sli.core.dblib.DBResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-public class SdncDhcpEventConsumer extends SdncDmaapConsumer {
+public class SdncDhcpEventConsumer extends SdncDmaapConsumerImpl {
private static final Logger LOG = LoggerFactory.getLogger(SdncDhcpEventConsumer.class);
private static final String MAC_ADDR_TAG = "macaddr";
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
index 2b416e7d..3fc769d3 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
@@ -2,8 +2,8 @@
* ============LICENSE_START=======================================================
* openECOMP : SDN-C
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
+ * reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,134 +21,14 @@
package org.onap.ccsdk.sli.northbound.dmaapclient;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-import java.io.File;
-import java.io.FileInputStream;
import java.util.Properties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public abstract class SdncDmaapConsumer implements Runnable {
+public abstract interface SdncDmaapConsumer extends Runnable {
+ public abstract void init(Properties baseProperties, String consumerPropertiesPath);
- private static final Logger LOG = LoggerFactory
- .getLogger(SdncDmaapConsumer.class);
+ public abstract void processMsg(String msg) throws InvalidMessageException;
- private Properties properties = null;
- private MRConsumer consumer = null;
- private MRConsumerResponse consumerResponse = null;
- private boolean running = false;
- private boolean ready = false;
- private int fetchPause = 5000; // Default pause between fetch - 5 seconds
- private int timeout = 15000; // Default timeout - 15 seconds
+ public abstract boolean isReady();
- public SdncDmaapConsumer() {
-
- }
-
- public SdncDmaapConsumer(Properties properties, String propertiesPath) {
- init(properties, propertiesPath);
- }
-
- public boolean isReady() {
- return ready;
- }
-
- public boolean isRunning() {
- return running;
- }
-
- public String getProperty(String name) {
- return properties.getProperty(name, "");
- }
-
- public void init(Properties properties, String propertiesPath) {
-
- try (FileInputStream in = new FileInputStream(new File(propertiesPath))) {
-
- LOG.debug("propertiesPath: " + propertiesPath);
- this.properties = (Properties) properties.clone();
- this.properties.load(in);
-
-
- String timeoutStr = this.properties.getProperty("timeout");
- LOG.debug("timeoutStr: " + timeoutStr);
-
- if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
- timeout = parseTimeOutValue(timeoutStr);
- }
-
- String fetchPauseStr = this.properties.getProperty("fetchPause");
- LOG.debug("fetchPause(Str): " + fetchPauseStr);
- if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
- fetchPause = parseFetchPause(fetchPauseStr);
- }
- LOG.debug("fetchPause: " + fetchPause);
-
-
- this.consumer = MRClientFactory.createConsumer(propertiesPath);
- ready = true;
- } catch (Exception e) {
- LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e);
- }
- }
-
- private int parseTimeOutValue(String timeoutStr) {
- try {
- return Integer.parseInt(timeoutStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
- }
- return timeout;
- }
-
- private int parseFetchPause(String fetchPauseStr) {
- try {
- return Integer.parseInt(fetchPauseStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
- }
- return fetchPause;
- }
-
-
- @Override
- public void run() {
- if (ready) {
-
- running = true;
-
- while (running) {
-
- try {
- boolean noData = true;
- consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
- for (String msg : consumerResponse.getActualMessages()) {
- noData = false;
- LOG.info("Received message from DMaaP:\n" + msg);
- processMsg(msg);
- }
-
- if (noData) {
- pauseThread();
- }
- } catch (Exception e) {
- LOG.error("Caught exception reading from DMaaP", e);
- running = false;
- }
- }
- }
- }
-
- private void pauseThread() throws InterruptedException {
- if (fetchPause > 0) {
- LOG.info(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
- Thread.sleep(fetchPause);
- } else {
- LOG.info("No data received from fetch. No fetch pause specified - retrying immediately");
- }
- }
-
- abstract public void processMsg(String msg) throws InvalidMessageException;
+ public abstract boolean isRunning();
}
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java
new file mode 100644
index 00000000..ddd87132
--- /dev/null
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java
@@ -0,0 +1,159 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
+
+public abstract class SdncDmaapConsumerImpl implements SdncDmaapConsumer {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SdncDmaapConsumer.class);
+
+ private final String name = this.getClass().getSimpleName();
+ private Properties properties = null;
+ private MRConsumer consumer = null;
+ private MRConsumerResponse consumerResponse = null;
+ private boolean running = false;
+ private boolean ready = false;
+ private int fetchPause = 5000; // Default pause between fetch - 5 seconds
+ private int timeout = 15000; // Default timeout - 15 seconds
+
+ public SdncDmaapConsumerImpl() {
+
+ }
+
+ public SdncDmaapConsumerImpl(Properties properties, String propertiesPath) {
+ init(properties, propertiesPath);
+ }
+
+ public boolean isReady() {
+ return ready;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public String getProperty(String name) {
+ return properties.getProperty(name, "");
+ }
+
+ public void init(Properties properties, String propertiesPath) {
+
+ try (FileInputStream in = new FileInputStream(new File(propertiesPath))) {
+
+ LOG.debug("propertiesPath: " + propertiesPath);
+ this.properties = (Properties) properties.clone();
+ this.properties.load(in);
+
+
+ String timeoutStr = this.properties.getProperty("timeout");
+ LOG.debug("timeoutStr: " + timeoutStr);
+
+ if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
+ timeout = parseTimeOutValue(timeoutStr);
+ }
+
+ String fetchPauseStr = this.properties.getProperty("fetchPause");
+ LOG.debug("fetchPause(Str): " + fetchPauseStr);
+ if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
+ fetchPause = parseFetchPause(fetchPauseStr);
+ }
+ LOG.debug("fetchPause: " + fetchPause);
+
+
+ this.consumer = MRClientFactory.createConsumer(propertiesPath);
+ ready = true;
+ } catch (Exception e) {
+ LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e);
+ }
+ }
+
+ private int parseTimeOutValue(String timeoutStr) {
+ try {
+ return Integer.parseInt(timeoutStr);
+ } catch (NumberFormatException e) {
+ LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
+ }
+ return timeout;
+ }
+
+ private int parseFetchPause(String fetchPauseStr) {
+ try {
+ return Integer.parseInt(fetchPauseStr);
+ } catch (NumberFormatException e) {
+ LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
+ }
+ return fetchPause;
+ }
+
+
+ @Override
+ public void run() {
+ if (ready) {
+
+ running = true;
+
+ while (running) {
+
+ try {
+ boolean noData = true;
+ consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
+ for (String msg : consumerResponse.getActualMessages()) {
+ noData = false;
+ LOG.info(name + " received ActualMessage from DMaaP:\n"+msg);
+ processMsg(msg);
+ }
+
+ if (noData) {
+ LOG.info(name + " received ResponseCode: " + consumerResponse.getResponseCode());
+ LOG.info(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
+ pauseThread();
+ }
+ } catch (Exception e) {
+ LOG.error("Caught exception reading from DMaaP", e);
+ running = false;
+ }
+
+
+ }
+ }
+ }
+
+ private void pauseThread() throws InterruptedException {
+ if (fetchPause > 0) {
+ LOG.info(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
+ Thread.sleep(fetchPause);
+ } else {
+ LOG.info("No data received from fetch. No fetch pause specified - retrying immediately");
+ }
+ }
+
+ abstract public void processMsg(String msg) throws InvalidMessageException;
+}
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
index 06e8ebe9..6c90c719 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
@@ -21,9 +21,6 @@
package org.onap.ccsdk.sli.northbound.dmaapclient;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -32,9 +29,12 @@ import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
-public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumer {
+public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumerImpl {
private static final Logger LOG = LoggerFactory.getLogger(SdncFlatJsonDmaapConsumer.class);
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java
index 53fb6db0..04f520bd 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SdncLcmDmaapConsumer extends SdncDmaapConsumer {
+public class SdncLcmDmaapConsumer extends SdncDmaapConsumerImpl {
private static final Logger LOG = LoggerFactory.getLogger(SdncLcmDmaapConsumer.class);