summaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java39
1 files changed, 18 insertions, 21 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
index 6be34553c..822dfdd22 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
@@ -9,15 +9,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
* ============LICENSE_END=========================================================
*/
@@ -44,7 +44,7 @@ import org.onap.appc.adapter.message.Consumer;
public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
// Default values
private static final int DEFAULT_TIMEOUT_MS = 15000;
@@ -54,8 +54,6 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
private List<String> urls;
private String filter;
- private boolean useHttps = false;
-
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter) {
@@ -64,7 +62,7 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter, String user, String password) {
- urls = new ArrayList<String>();
+ urls = new ArrayList<>();
for (String host : hosts) {
urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
}
@@ -74,13 +72,13 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
@Override
public void updateCredentials(String user, String pass) {
- LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ logger.debug(String.format("Setting auth to %s for %s", user, this.toString()));
this.setBasicAuth(user, pass);
}
@Override
public List<String> fetch(int waitMs, int limit) {
- LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
+ logger.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
List<String> out = new ArrayList<>();
try {
List<NameValuePair> urlParams = new ArrayList<>();
@@ -94,7 +92,7 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
builder.setParameters(urlParams);
URI uri = builder.build();
- LOG.info(String.format("GET %s", uri));
+ logger.info(String.format("GET %s", uri));
HttpGet request = getReq(uri, waitMs);
CloseableHttpResponse response = getClient().execute(request);
@@ -102,28 +100,28 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
HttpEntity entity = response.getEntity();
String body = (entity != null) ? EntityUtils.toString(entity) : null;
- LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
- (body != null ? body.length() : "null")));
+ logger.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
+ body != null ? body.length() : "null"));
response.close();
if (httpStatus == 200 && body != null) {
JSONArray json = new JSONArray(body);
- LOG.info(String.format("Got %d messages from DMaaP", json.length()));
+ logger.info(String.format("Got %d messages from DMaaP", json.length()));
for (int i = 0; i < json.length(); i++) {
out.add(json.getString(i));
}
} else {
- LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
+ logger.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
sleep(waitMs);
}
} catch (Exception e) {
if (urls.size() > 1) {
String failedUrl = urls.remove(0);
urls.add(failedUrl);
- LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
+ logger.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
urls.get(0)));
}
- LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
+ logger.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
sleep(waitMs);
}
@@ -137,27 +135,26 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
@Override
public String toString() {
- String hostStr = (urls == null && !urls.isEmpty()) ? "N/A" : urls.get(0);
+ String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0);
return String.format("Consumer listening to [%s]", hostStr);
}
@Override
public void useHttps(boolean yes) {
- useHttps = yes;
}
private void sleep(int ms) {
- LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
+ logger.info(String.format("Sleeping for %ds after failed request", ms / 1000));
try {
Thread.sleep(ms);
} catch (InterruptedException e1) {
- LOG.error("Interrupted while sleeping");
+ logger.error("Interrupted while sleeping", e1);
+ Thread.currentThread().interrupt();
}
}
@Override
public void close() {
- // Nothing to do
}
}