diff options
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java')
-rw-r--r-- | appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java | 43 |
1 files changed, 19 insertions, 24 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java index 822dfdd22..5149e0c32 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java @@ -24,14 +24,13 @@ package org.onap.appc.adapter.messaging.dmaap.http; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - import org.apache.http.HttpEntity; import org.apache.http.NameValuePair; import org.apache.http.client.methods.CloseableHttpResponse; @@ -44,7 +43,7 @@ import org.onap.appc.adapter.message.Consumer; public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { - private final EELFLogger logger = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); // Default values private static final int DEFAULT_TIMEOUT_MS = 15000; @@ -54,14 +53,15 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer private List<String> urls; private String filter; - public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, String filter) { + this(hosts, topicName, consumerName, consumerId, filter, null, null); } public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, String filter, String user, String password) { + urls = new ArrayList<>(); for (String host : hosts) { urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); @@ -72,13 +72,13 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer @Override public void updateCredentials(String user, String pass) { - logger.debug(String.format("Setting auth to %s for %s", user, this.toString())); + LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); this.setBasicAuth(user, pass); } @Override public List<String> fetch(int waitMs, int limit) { - logger.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); + LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); List<String> out = new ArrayList<>(); try { List<NameValuePair> urlParams = new ArrayList<>(); @@ -92,39 +92,38 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer builder.setParameters(urlParams); URI uri = builder.build(); - logger.info(String.format("GET %s", uri)); + LOG.info(String.format("GET %s", uri)); HttpGet request = getReq(uri, waitMs); CloseableHttpResponse response = getClient().execute(request); int httpStatus = response.getStatusLine().getStatusCode(); HttpEntity entity = response.getEntity(); - String body = (entity != null) ? EntityUtils.toString(entity) : null; + String body = (entity != null) ? entityToString(entity) : null; - logger.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, + LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, body != null ? body.length() : "null")); response.close(); if (httpStatus == 200 && body != null) { JSONArray json = new JSONArray(body); - logger.info(String.format("Got %d messages from DMaaP", json.length())); + LOG.info(String.format("Got %d messages from DMaaP", json.length())); for (int i = 0; i < json.length(); i++) { out.add(json.getString(i)); } } else { - logger.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); + LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); sleep(waitMs); } } catch (Exception e) { if (urls.size() > 1) { String failedUrl = urls.remove(0); urls.add(failedUrl); - logger.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, + LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, urls.get(0))); } - logger.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); + LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); sleep(waitMs); } - return out; } @@ -139,22 +138,18 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer return String.format("Consumer listening to [%s]", hostStr); } - @Override - public void useHttps(boolean yes) { + String entityToString(HttpEntity entity) throws IOException { + return EntityUtils.toString(entity); } private void sleep(int ms) { - logger.info(String.format("Sleeping for %ds after failed request", ms / 1000)); + LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000)); try { Thread.sleep(ms); } catch (InterruptedException e1) { - logger.error("Interrupted while sleeping", e1); + LOG.error("Interrupted while sleeping", e1); Thread.currentThread().interrupt(); } } - @Override - public void close() { - } - } |