diff options
author | wejs <maciej.wejs@nokia.com> | 2018-02-23 15:59:31 +0100 |
---|---|---|
committer | Patrick Brady <pb071s@att.com> | 2018-03-01 08:40:41 +0000 |
commit | 1aeaed6f086ebd2c6621867146112e4f2f675429 (patch) | |
tree | b9a1d21676419888869b784ce922b140c429d6fd /appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org | |
parent | 1e65cff150f78124da0a7d2df84d1e92fbd00ac3 (diff) |
Fixes in "appc-dmaap-adapter-bundle"
Correction of Sonar and CheckStyle issues
Added JUnit test coverage.
Change-Id: I119c7555e33e09c87b572ff0b63f55e2884be75f
Issue-ID: APPC-646
Signed-off-by: wejs <maciej.wejs@nokia.com>
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org')
4 files changed, 107 insertions, 92 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java new file mode 100644 index 000000000..f670d6cdd --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java @@ -0,0 +1,28 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.adapter.messaging.dmaap.http; + +class AuthenticationException extends Exception { + + public AuthenticationException(String message) { + super(message); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java index e67007f64..0749eb185 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java @@ -34,79 +34,74 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; -public class CommonHttpClient { +abstract class CommonHttpClient { - public static final int HTTPS_PORT = 3905; + private static final int HTTP_PORT = 3904; + private static final int HTTPS_PORT = 3905; + private static final int TIMEOUT_OFFSET = 5000; - private String AUTH_STR; + private String authStr; protected void setBasicAuth(String username, String password) { if (username != null && password != null) { String plain = String.format("%s:%s", username, password); - AUTH_STR = Base64.encodeBase64String(plain.getBytes()); + authStr = Base64.encodeBase64String(plain.getBytes()); } else { - AUTH_STR = null; + authStr = null; } } - public HttpGet getReq(URI uri, int timeoutMs) throws Exception { - if (AUTH_STR == null) { - throw new Exception("All DMaaP requests require authentication and none was provided."); + protected HttpGet getReq(URI uri, int timeoutMs) throws AuthenticationException { + if (authStr == null) { + throw new AuthenticationException("All DMaaP requests require authentication and none was provided."); } HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri); - out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); + out.setHeader("Authorization", String.format("Basic %s", authStr)); out.setConfig(getConfig(timeoutMs)); return out; } - public HttpPost postReq(String url) throws Exception { - if (AUTH_STR == null) { - throw new Exception("All DMaaP requests require authentication and none was provided."); + protected HttpPost postReq(String url) throws AuthenticationException { + if (authStr == null) { + throw new AuthenticationException("All DMaaP requests require authentication and none was provided."); } HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url); - out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); + out.setHeader("Authorization", String.format("Basic %s", authStr)); out.setConfig(getConfig(0)); return out; } private RequestConfig getConfig(int timeoutMs) { Builder builder = RequestConfig.custom(); - builder.setSocketTimeout(timeoutMs + 5000); + builder.setSocketTimeout(timeoutMs + TIMEOUT_OFFSET); return builder.build(); } - public CloseableHttpClient getClient() { - return getClient(false); - } - - public CloseableHttpClient getClient(boolean useHttps) { + protected CloseableHttpClient getClient() { return HttpClientBuilder.create().build(); } - public String formatHostString(String host) { + protected String formatHostString(String host) { return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT))); } - public String formatHostString(String host, boolean useHttps) { + private String formatHostString(String host, boolean useHttps) { // Trim trailing slash String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host; - boolean hasProto = out.startsWith("http"); + boolean hasProtocol = out.startsWith("http"); boolean hasPort = out.contains(":"); // Add protocol - if (!hasProto) { + if (!hasProtocol) { out = String.format("%s%s", (useHttps) ? "https://" : "http://", out); } - // Add port if (!hasPort) { - out = String.format("%s:%d", out, (useHttps) ? 3905 : 3904); + out = String.format("%s:%d", out, (useHttps) ? HTTPS_PORT : HTTP_PORT); } - return out; - } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java index 822dfdd22..5149e0c32 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java @@ -24,14 +24,13 @@ package org.onap.appc.adapter.messaging.dmaap.http; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - import org.apache.http.HttpEntity; import org.apache.http.NameValuePair; import org.apache.http.client.methods.CloseableHttpResponse; @@ -44,7 +43,7 @@ import org.onap.appc.adapter.message.Consumer; public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { - private final EELFLogger logger = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); // Default values private static final int DEFAULT_TIMEOUT_MS = 15000; @@ -54,14 +53,15 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer private List<String> urls; private String filter; - public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, String filter) { + this(hosts, topicName, consumerName, consumerId, filter, null, null); } public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, String filter, String user, String password) { + urls = new ArrayList<>(); for (String host : hosts) { urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); @@ -72,13 +72,13 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer @Override public void updateCredentials(String user, String pass) { - logger.debug(String.format("Setting auth to %s for %s", user, this.toString())); + LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); this.setBasicAuth(user, pass); } @Override public List<String> fetch(int waitMs, int limit) { - logger.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); + LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); List<String> out = new ArrayList<>(); try { List<NameValuePair> urlParams = new ArrayList<>(); @@ -92,39 +92,38 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer builder.setParameters(urlParams); URI uri = builder.build(); - logger.info(String.format("GET %s", uri)); + LOG.info(String.format("GET %s", uri)); HttpGet request = getReq(uri, waitMs); CloseableHttpResponse response = getClient().execute(request); int httpStatus = response.getStatusLine().getStatusCode(); HttpEntity entity = response.getEntity(); - String body = (entity != null) ? EntityUtils.toString(entity) : null; + String body = (entity != null) ? entityToString(entity) : null; - logger.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, + LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, body != null ? body.length() : "null")); response.close(); if (httpStatus == 200 && body != null) { JSONArray json = new JSONArray(body); - logger.info(String.format("Got %d messages from DMaaP", json.length())); + LOG.info(String.format("Got %d messages from DMaaP", json.length())); for (int i = 0; i < json.length(); i++) { out.add(json.getString(i)); } } else { - logger.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); + LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); sleep(waitMs); } } catch (Exception e) { if (urls.size() > 1) { String failedUrl = urls.remove(0); urls.add(failedUrl); - logger.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, + LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, urls.get(0))); } - logger.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); + LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); sleep(waitMs); } - return out; } @@ -139,22 +138,18 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer return String.format("Consumer listening to [%s]", hostStr); } - @Override - public void useHttps(boolean yes) { + String entityToString(HttpEntity entity) throws IOException { + return EntityUtils.toString(entity); } private void sleep(int ms) { - logger.info(String.format("Sleeping for %ds after failed request", ms / 1000)); + LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000)); try { Thread.sleep(ms); } catch (InterruptedException e1) { - logger.error("Interrupted while sleeping", e1); + LOG.error("Interrupted while sleeping", e1); Thread.currentThread().interrupt(); } } - @Override - public void close() { - } - } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java index 74c0c26b2..fe22ea10b 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java @@ -24,16 +24,14 @@ package org.onap.appc.adapter.messaging.dmaap.http; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; @@ -41,7 +39,7 @@ import org.onap.appc.adapter.message.Producer; public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer { - private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); private static final String CONTENT_TYPE = "application/cambria"; private static final String URL_TEMPLATE = "%s/events/%s"; @@ -49,19 +47,20 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer private List<String> hosts; private Set<String> topics; - private boolean useHttps = false; + public HttpDmaapProducerImpl() { + //for test purposes + } public HttpDmaapProducerImpl(Collection<String> urls, String topicName) { - hosts = new ArrayList<>(); topics = new HashSet<>(); topics.add(topicName); + hosts = new ArrayList<>(); for (String host : urls) { hosts.add(formatHostString(host)); } } - @Override public void updateCredentials(String user, String pass) { LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); @@ -70,32 +69,16 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer @Override public boolean post(String partition, String data) { - int sent = 0; + long sent = 0; try { HttpPost request = postReq(null); request.setHeader("Content-Type", CONTENT_TYPE); request.setEntity(new StringEntity(bodyLine(partition, data))); - for (String topic : topics) { - String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic); - try { - request.setURI(new URI(uriStr)); - CloseableHttpResponse response = getClient().execute(request); - if (response.getStatusLine().getStatusCode() == 200) { - sent++; - } - response.close(); - } catch (Exception sendEx) { - LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()), - sendEx); - if (hosts.size() > 1) { - String failedUrl = hosts.remove(0); - hosts.add(failedUrl); - LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", - failedUrl, hosts.get(0))); - } - } - } + sent = topics.stream() + .filter(topic -> sendRequest(request, topic)) + .count(); + } catch (Exception buildEx) { LOG.error( String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s", @@ -105,26 +88,40 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer return sent == topics.size(); } - @Override - public void useHttps(boolean yes) { - useHttps = yes; + private boolean sendRequest(HttpPost request, String topic) { + boolean successful = false; + String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic); + try { + request.setURI(new URI(uriStr)); + CloseableHttpResponse response = getClient().execute(request); + if (response.getStatusLine().getStatusCode() == 200) { + successful = true; + } + response.close(); + } catch (Exception sendEx) { + LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()), + sendEx); + if (hosts.size() > 1) { + String failedUrl = hosts.remove(0); + hosts.add(failedUrl); + LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", + failedUrl, hosts.get(0))); + } + } + return successful; } /** * Format the body for the application/cambria content type with no partitioning. See * - * @param msg + * @param message * The message body to format * @return A string in the application/cambria content type */ - private String bodyLine(String partition, String msg) { - String p = (partition == null) ? "" : partition; - String m = (msg == null) ? "" : msg; - return String.format("%d.%d.%s%s", p.length(), m.length(), p, m); + private String bodyLine(String partition, String message) { + String prt = (partition == null) ? "" : partition; + String msg = (message == null) ? "" : message; + return String.format("%d.%d.%s%s", prt.length(), msg.length(), prt, msg); } - @Override - public void close() { - // Nothing to do - } } |