diff options
9 files changed, 383 insertions, 146 deletions
diff --git a/dmaap-listener/pom.xml b/dmaap-listener/pom.xml index 4dc12156..0d781698 100755 --- a/dmaap-listener/pom.xml +++ b/dmaap-listener/pom.xml @@ -30,6 +30,17 @@ <SWM_VERSION>${project.version}-${build.number}</SWM_VERSION> </properties> + <dependencyManagement> + <dependencies> + <!-- dmaapClient needs this version of this jar --> + <dependency> + <groupId>javax.ws.rs</groupId> + <artifactId>javax.ws.rs-api</artifactId> + <version>2.1</version> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> <dependency> @@ -105,8 +116,8 @@ <version>2.5.1</version> <inherited>true</inherited> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>1.8</source> + <target>1.8</target> </configuration> </plugin> <plugin> diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java index a8336342..57fcd880 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java @@ -24,7 +24,7 @@ package org.onap.ccsdk.sli.northbound.dmaapclient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DummyDmaapConsumer extends SdncDmaapConsumer { +public class DummyDmaapConsumer extends SdncDmaapConsumerImpl { private static final Logger LOG = LoggerFactory .getLogger(DummyDmaapConsumer.class); diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java new file mode 100644 index 00000000..234a2026 --- /dev/null +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java @@ -0,0 +1,194 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.northbound.dmaapclient; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.Invocation.Builder; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * jax-rs based client to build message router consumers + */ +public class MessageRouterHttpClient implements SdncDmaapConsumer { + private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClient.class); + + protected Boolean isReady = false; + protected Boolean isRunning = false; + protected Client client; + protected URI uri; + protected Invocation getMessages; + protected Integer fetchPause; + protected Properties properties; + protected final String DEFAULT_CONNECT_TIMEOUT_SECONDS = "30"; + protected final String DEFAULT_READ_TIMEOUT_MINUTES = "3"; + protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000"; + protected final String DEFAULT_LIMIT = null; + + public MessageRouterHttpClient() { + + } + + @Override + public void run() { + if (isReady) { + isRunning = true; + while (isRunning) { + try { + Response response = getMessages.invoke(); + Log.info("GET " + uri + " returned http status " + response.getStatus()); + String entity = response.readEntity(String.class); + if (entity.contains("{")) { + // Get rid of opening [" + entity = entity.substring(2); + // Get rid of closing "] + entity = entity.substring(0, entity.length() - 2); + // This replacement effectively un-escapes the JSON + for (String message : entity.split("\",\"")) { + try { + processMsg(message.replace("\\\"", "\"")); + } catch (InvalidMessageException e) { + Log.error("Message could not be processed", e); + } + } + } else { + Log.info("Entity doesn't appear to contain JSON elements"); + } + } catch (Exception e) { + Log.error("GET " + uri + " failed.", e); + } finally { + Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again."); + try { + Thread.sleep(fetchPause); + } catch (InterruptedException e) { + Log.error("Could not sleep thread", e); + } + } + } + } + } + + @Override + public void init(Properties baseProperties, String consumerPropertiesPath) { + try { + baseProperties.load(new FileInputStream(new File(consumerPropertiesPath))); + this.properties = baseProperties; + String username = baseProperties.getProperty("username"); + String password = baseProperties.getProperty("password"); + String topic = baseProperties.getProperty("topic"); + String group = baseProperties.getProperty("group"); + String host = baseProperties.getProperty("host"); + String id = baseProperties.getProperty("id"); + + String filter = baseProperties.getProperty("filter"); + if (filter != null) { + if (filter.length() > 0) { + filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name()); + } else { + filter = null; + } + } + + String limitString = baseProperties.getProperty("limit", DEFAULT_LIMIT); + Integer limit = null; + if (limitString != null && limitString.length() > 0) { + limit = Integer.valueOf(limitString); + } + + Integer timeoutQueryParamValue = + Integer.valueOf(baseProperties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE)); + Integer connectTimeoutSeconds = Integer + .valueOf(baseProperties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT_SECONDS)); + Integer readTimeoutMinutes = + Integer.valueOf(baseProperties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT_MINUTES)); + + String authorizationString = buildAuthorizationString(username, password); + this.uri = buildUri(topic, group, id, host, timeoutQueryParamValue, limit, filter); + this.client = getClient(connectTimeoutSeconds, readTimeoutMinutes); + Builder builder = + client.target(uri).request("application/json").header("Authorization", authorizationString); + this.getMessages = builder.buildGet(); + this.fetchPause = Integer.valueOf(baseProperties.getProperty("fetchPause")); + this.isReady = true; + } catch (FileNotFoundException e) { + Log.error("FileNotFoundException while reading consumer properties", e); + } catch (IOException e) { + Log.error("IOException while reading consumer properties", e); + } + } + + @Override + public void processMsg(String msg) throws InvalidMessageException { + System.out.println(msg); + } + + @Override + public boolean isReady() { + return isReady; + } + + @Override + public boolean isRunning() { + return isRunning; + } + + protected String buildAuthorizationString(String userName, String password) { + String basicAuthString = userName + ":" + password; + basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes()); + return "Basic " + basicAuthString; + } + + protected Client getClient(Integer connectTimeoutSeconds, Integer readTimeoutMinutes) { + ClientBuilder clientBuilder = ClientBuilder.newBuilder(); + clientBuilder.connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS); + clientBuilder.readTimeout(readTimeoutMinutes, TimeUnit.MINUTES); + return clientBuilder.build(); + } + + protected URI buildUri(String topic, String consumerGroup, String consumerId, String host, Integer timeout, + Integer limit, String filter) { + UriBuilder builder = UriBuilder.fromPath("http://" + host + "/events/{topic}/{consumerGroup}/{consumderId}"); + builder.queryParam("timeout", timeout); + if (limit != null) { + builder.queryParam("limit", limit); + } + if (filter != null) { + builder.queryParam("filter", filter); + } + return builder.build(topic, consumerGroup, consumerId); + } + +} diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java index 0e12dfa2..2c4de710 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java @@ -21,9 +21,6 @@ package org.onap.ccsdk.sli.northbound.dmaapclient; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -34,14 +31,15 @@ import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.Properties; - import org.apache.velocity.VelocityContext; import org.apache.velocity.app.VelocityEngine; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; -public class SdncAaiDmaapConsumer extends SdncDmaapConsumer { +public class SdncAaiDmaapConsumer extends SdncDmaapConsumerImpl { private static final Logger LOG = LoggerFactory.getLogger(SdncAaiDmaapConsumer.class); private static final String SDNC_ENDPOINT = "SDNC.endpoint"; diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java index 03560d30..7b68ceb6 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java @@ -5,19 +5,14 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.sql.SQLException; -import java.util.Iterator; -import java.util.Map; import java.util.Properties; - import org.onap.ccsdk.sli.core.dblib.DBResourceManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -public class SdncDhcpEventConsumer extends SdncDmaapConsumer { +public class SdncDhcpEventConsumer extends SdncDmaapConsumerImpl { private static final Logger LOG = LoggerFactory.getLogger(SdncDhcpEventConsumer.class); private static final String MAC_ADDR_TAG = "macaddr"; diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java index 2b416e7d..3fc769d3 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java @@ -2,8 +2,8 @@ * ============LICENSE_START======================================================= * openECOMP : SDN-C * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. + * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights + * reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,134 +21,14 @@ package org.onap.ccsdk.sli.northbound.dmaapclient; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; -import com.att.nsa.mr.client.response.MRConsumerResponse; -import java.io.File; -import java.io.FileInputStream; import java.util.Properties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public abstract class SdncDmaapConsumer implements Runnable { +public abstract interface SdncDmaapConsumer extends Runnable { + public abstract void init(Properties baseProperties, String consumerPropertiesPath); - private static final Logger LOG = LoggerFactory - .getLogger(SdncDmaapConsumer.class); + public abstract void processMsg(String msg) throws InvalidMessageException; - private Properties properties = null; - private MRConsumer consumer = null; - private MRConsumerResponse consumerResponse = null; - private boolean running = false; - private boolean ready = false; - private int fetchPause = 5000; // Default pause between fetch - 5 seconds - private int timeout = 15000; // Default timeout - 15 seconds + public abstract boolean isReady(); - public SdncDmaapConsumer() { - - } - - public SdncDmaapConsumer(Properties properties, String propertiesPath) { - init(properties, propertiesPath); - } - - public boolean isReady() { - return ready; - } - - public boolean isRunning() { - return running; - } - - public String getProperty(String name) { - return properties.getProperty(name, ""); - } - - public void init(Properties properties, String propertiesPath) { - - try (FileInputStream in = new FileInputStream(new File(propertiesPath))) { - - LOG.debug("propertiesPath: " + propertiesPath); - this.properties = (Properties) properties.clone(); - this.properties.load(in); - - - String timeoutStr = this.properties.getProperty("timeout"); - LOG.debug("timeoutStr: " + timeoutStr); - - if ((timeoutStr != null) && (timeoutStr.length() > 0)) { - timeout = parseTimeOutValue(timeoutStr); - } - - String fetchPauseStr = this.properties.getProperty("fetchPause"); - LOG.debug("fetchPause(Str): " + fetchPauseStr); - if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) { - fetchPause = parseFetchPause(fetchPauseStr); - } - LOG.debug("fetchPause: " + fetchPause); - - - this.consumer = MRClientFactory.createConsumer(propertiesPath); - ready = true; - } catch (Exception e) { - LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e); - } - } - - private int parseTimeOutValue(String timeoutStr) { - try { - return Integer.parseInt(timeoutStr); - } catch (NumberFormatException e) { - LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")"); - } - return timeout; - } - - private int parseFetchPause(String fetchPauseStr) { - try { - return Integer.parseInt(fetchPauseStr); - } catch (NumberFormatException e) { - LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")"); - } - return fetchPause; - } - - - @Override - public void run() { - if (ready) { - - running = true; - - while (running) { - - try { - boolean noData = true; - consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1); - for (String msg : consumerResponse.getActualMessages()) { - noData = false; - LOG.info("Received message from DMaaP:\n" + msg); - processMsg(msg); - } - - if (noData) { - pauseThread(); - } - } catch (Exception e) { - LOG.error("Caught exception reading from DMaaP", e); - running = false; - } - } - } - } - - private void pauseThread() throws InterruptedException { - if (fetchPause > 0) { - LOG.info(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause)); - Thread.sleep(fetchPause); - } else { - LOG.info("No data received from fetch. No fetch pause specified - retrying immediately"); - } - } - - abstract public void processMsg(String msg) throws InvalidMessageException; + public abstract boolean isRunning(); } diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java new file mode 100644 index 00000000..ddd87132 --- /dev/null +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java @@ -0,0 +1,159 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.northbound.dmaapclient; + +import java.io.File; +import java.io.FileInputStream; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.response.MRConsumerResponse; + +public abstract class SdncDmaapConsumerImpl implements SdncDmaapConsumer { + + private static final Logger LOG = LoggerFactory + .getLogger(SdncDmaapConsumer.class); + + private final String name = this.getClass().getSimpleName(); + private Properties properties = null; + private MRConsumer consumer = null; + private MRConsumerResponse consumerResponse = null; + private boolean running = false; + private boolean ready = false; + private int fetchPause = 5000; // Default pause between fetch - 5 seconds + private int timeout = 15000; // Default timeout - 15 seconds + + public SdncDmaapConsumerImpl() { + + } + + public SdncDmaapConsumerImpl(Properties properties, String propertiesPath) { + init(properties, propertiesPath); + } + + public boolean isReady() { + return ready; + } + + public boolean isRunning() { + return running; + } + + public String getProperty(String name) { + return properties.getProperty(name, ""); + } + + public void init(Properties properties, String propertiesPath) { + + try (FileInputStream in = new FileInputStream(new File(propertiesPath))) { + + LOG.debug("propertiesPath: " + propertiesPath); + this.properties = (Properties) properties.clone(); + this.properties.load(in); + + + String timeoutStr = this.properties.getProperty("timeout"); + LOG.debug("timeoutStr: " + timeoutStr); + + if ((timeoutStr != null) && (timeoutStr.length() > 0)) { + timeout = parseTimeOutValue(timeoutStr); + } + + String fetchPauseStr = this.properties.getProperty("fetchPause"); + LOG.debug("fetchPause(Str): " + fetchPauseStr); + if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) { + fetchPause = parseFetchPause(fetchPauseStr); + } + LOG.debug("fetchPause: " + fetchPause); + + + this.consumer = MRClientFactory.createConsumer(propertiesPath); + ready = true; + } catch (Exception e) { + LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e); + } + } + + private int parseTimeOutValue(String timeoutStr) { + try { + return Integer.parseInt(timeoutStr); + } catch (NumberFormatException e) { + LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")"); + } + return timeout; + } + + private int parseFetchPause(String fetchPauseStr) { + try { + return Integer.parseInt(fetchPauseStr); + } catch (NumberFormatException e) { + LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")"); + } + return fetchPause; + } + + + @Override + public void run() { + if (ready) { + + running = true; + + while (running) { + + try { + boolean noData = true; + consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1); + for (String msg : consumerResponse.getActualMessages()) { + noData = false; + LOG.info(name + " received ActualMessage from DMaaP:\n"+msg); + processMsg(msg); + } + + if (noData) { + LOG.info(name + " received ResponseCode: " + consumerResponse.getResponseCode()); + LOG.info(name + " received ResponseMessage: " + consumerResponse.getResponseMessage()); + pauseThread(); + } + } catch (Exception e) { + LOG.error("Caught exception reading from DMaaP", e); + running = false; + } + + + } + } + } + + private void pauseThread() throws InterruptedException { + if (fetchPause > 0) { + LOG.info(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause)); + Thread.sleep(fetchPause); + } else { + LOG.info("No data received from fetch. No fetch pause specified - retrying immediately"); + } + } + + abstract public void processMsg(String msg) throws InvalidMessageException; +} diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java index 06e8ebe9..6c90c719 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java @@ -21,9 +21,6 @@ package org.onap.ccsdk.sli.northbound.dmaapclient; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -32,9 +29,12 @@ import java.util.Iterator; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; -public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumer { +public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumerImpl { private static final Logger LOG = LoggerFactory.getLogger(SdncFlatJsonDmaapConsumer.class); diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java index 53fb6db0..04f520bd 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java @@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SdncLcmDmaapConsumer extends SdncDmaapConsumer { +public class SdncLcmDmaapConsumer extends SdncDmaapConsumerImpl { private static final Logger LOG = LoggerFactory.getLogger(SdncLcmDmaapConsumer.class); |