diff options
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle')
-rw-r--r-- | appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java | 39 |
1 files changed, 18 insertions, 21 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java index 6be34553c..822dfdd22 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java @@ -9,15 +9,15 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * ============LICENSE_END========================================================= */ @@ -44,7 +44,7 @@ import org.onap.appc.adapter.message.Consumer; public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { - private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); + private final EELFLogger logger = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); // Default values private static final int DEFAULT_TIMEOUT_MS = 15000; @@ -54,8 +54,6 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer private List<String> urls; private String filter; - private boolean useHttps = false; - public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, String filter) { @@ -64,7 +62,7 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, String filter, String user, String password) { - urls = new ArrayList<String>(); + urls = new ArrayList<>(); for (String host : hosts) { urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); } @@ -74,13 +72,13 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer @Override public void updateCredentials(String user, String pass) { - LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); + logger.debug(String.format("Setting auth to %s for %s", user, this.toString())); this.setBasicAuth(user, pass); } @Override public List<String> fetch(int waitMs, int limit) { - LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); + logger.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); List<String> out = new ArrayList<>(); try { List<NameValuePair> urlParams = new ArrayList<>(); @@ -94,7 +92,7 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer builder.setParameters(urlParams); URI uri = builder.build(); - LOG.info(String.format("GET %s", uri)); + logger.info(String.format("GET %s", uri)); HttpGet request = getReq(uri, waitMs); CloseableHttpResponse response = getClient().execute(request); @@ -102,28 +100,28 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer HttpEntity entity = response.getEntity(); String body = (entity != null) ? EntityUtils.toString(entity) : null; - LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, - (body != null ? body.length() : "null"))); + logger.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, + body != null ? body.length() : "null")); response.close(); if (httpStatus == 200 && body != null) { JSONArray json = new JSONArray(body); - LOG.info(String.format("Got %d messages from DMaaP", json.length())); + logger.info(String.format("Got %d messages from DMaaP", json.length())); for (int i = 0; i < json.length(); i++) { out.add(json.getString(i)); } } else { - LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); + logger.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); sleep(waitMs); } } catch (Exception e) { if (urls.size() > 1) { String failedUrl = urls.remove(0); urls.add(failedUrl); - LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, + logger.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, urls.get(0))); } - LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); + logger.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); sleep(waitMs); } @@ -137,27 +135,26 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer @Override public String toString() { - String hostStr = (urls == null && !urls.isEmpty()) ? "N/A" : urls.get(0); + String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0); return String.format("Consumer listening to [%s]", hostStr); } @Override public void useHttps(boolean yes) { - useHttps = yes; } private void sleep(int ms) { - LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000)); + logger.info(String.format("Sleeping for %ds after failed request", ms / 1000)); try { Thread.sleep(ms); } catch (InterruptedException e1) { - LOG.error("Interrupted while sleeping"); + logger.error("Interrupted while sleeping", e1); + Thread.currentThread().interrupt(); } } @Override public void close() { - // Nothing to do } } |