summaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java43
1 files changed, 19 insertions, 24 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
index 822dfdd22..5149e0c32 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
@@ -24,14 +24,13 @@
package org.onap.appc.adapter.messaging.dmaap.http;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -44,7 +43,7 @@ import org.onap.appc.adapter.message.Consumer;
public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
- private final EELFLogger logger = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
// Default values
private static final int DEFAULT_TIMEOUT_MS = 15000;
@@ -54,14 +53,15 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
private List<String> urls;
private String filter;
-
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter) {
+
this(hosts, topicName, consumerName, consumerId, filter, null, null);
}
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter, String user, String password) {
+
urls = new ArrayList<>();
for (String host : hosts) {
urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
@@ -72,13 +72,13 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
@Override
public void updateCredentials(String user, String pass) {
- logger.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
this.setBasicAuth(user, pass);
}
@Override
public List<String> fetch(int waitMs, int limit) {
- logger.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
+ LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
List<String> out = new ArrayList<>();
try {
List<NameValuePair> urlParams = new ArrayList<>();
@@ -92,39 +92,38 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
builder.setParameters(urlParams);
URI uri = builder.build();
- logger.info(String.format("GET %s", uri));
+ LOG.info(String.format("GET %s", uri));
HttpGet request = getReq(uri, waitMs);
CloseableHttpResponse response = getClient().execute(request);
int httpStatus = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
- String body = (entity != null) ? EntityUtils.toString(entity) : null;
+ String body = (entity != null) ? entityToString(entity) : null;
- logger.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
+ LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
body != null ? body.length() : "null"));
response.close();
if (httpStatus == 200 && body != null) {
JSONArray json = new JSONArray(body);
- logger.info(String.format("Got %d messages from DMaaP", json.length()));
+ LOG.info(String.format("Got %d messages from DMaaP", json.length()));
for (int i = 0; i < json.length(); i++) {
out.add(json.getString(i));
}
} else {
- logger.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
+ LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
sleep(waitMs);
}
} catch (Exception e) {
if (urls.size() > 1) {
String failedUrl = urls.remove(0);
urls.add(failedUrl);
- logger.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
urls.get(0)));
}
- logger.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
+ LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
sleep(waitMs);
}
-
return out;
}
@@ -139,22 +138,18 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
return String.format("Consumer listening to [%s]", hostStr);
}
- @Override
- public void useHttps(boolean yes) {
+ String entityToString(HttpEntity entity) throws IOException {
+ return EntityUtils.toString(entity);
}
private void sleep(int ms) {
- logger.info(String.format("Sleeping for %ds after failed request", ms / 1000));
+ LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
try {
Thread.sleep(ms);
} catch (InterruptedException e1) {
- logger.error("Interrupted while sleeping", e1);
+ LOG.error("Interrupted while sleeping", e1);
Thread.currentThread().interrupt();
}
}
- @Override
- public void close() {
- }
-
}