aboutsummaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org
diff options
context:
space:
mode:
authorwejs <maciej.wejs@nokia.com>2018-02-23 15:59:31 +0100
committerPatrick Brady <pb071s@att.com>2018-03-01 08:40:41 +0000
commit1aeaed6f086ebd2c6621867146112e4f2f675429 (patch)
treeb9a1d21676419888869b784ce922b140c429d6fd /appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org
parent1e65cff150f78124da0a7d2df84d1e92fbd00ac3 (diff)
Fixes in "appc-dmaap-adapter-bundle"
Correction of Sonar and CheckStyle issues Added JUnit test coverage. Change-Id: I119c7555e33e09c87b572ff0b63f55e2884be75f Issue-ID: APPC-646 Signed-off-by: wejs <maciej.wejs@nokia.com>
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java28
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java49
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java43
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java79
4 files changed, 107 insertions, 92 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java
new file mode 100644
index 000000000..f670d6cdd
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+class AuthenticationException extends Exception {
+
+ public AuthenticationException(String message) {
+ super(message);
+ }
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
index e67007f64..0749eb185 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
@@ -34,79 +34,74 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
-public class CommonHttpClient {
+abstract class CommonHttpClient {
- public static final int HTTPS_PORT = 3905;
+ private static final int HTTP_PORT = 3904;
+ private static final int HTTPS_PORT = 3905;
+ private static final int TIMEOUT_OFFSET = 5000;
- private String AUTH_STR;
+ private String authStr;
protected void setBasicAuth(String username, String password) {
if (username != null && password != null) {
String plain = String.format("%s:%s", username, password);
- AUTH_STR = Base64.encodeBase64String(plain.getBytes());
+ authStr = Base64.encodeBase64String(plain.getBytes());
} else {
- AUTH_STR = null;
+ authStr = null;
}
}
- public HttpGet getReq(URI uri, int timeoutMs) throws Exception {
- if (AUTH_STR == null) {
- throw new Exception("All DMaaP requests require authentication and none was provided.");
+ protected HttpGet getReq(URI uri, int timeoutMs) throws AuthenticationException {
+ if (authStr == null) {
+ throw new AuthenticationException("All DMaaP requests require authentication and none was provided.");
}
HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri);
- out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
+ out.setHeader("Authorization", String.format("Basic %s", authStr));
out.setConfig(getConfig(timeoutMs));
return out;
}
- public HttpPost postReq(String url) throws Exception {
- if (AUTH_STR == null) {
- throw new Exception("All DMaaP requests require authentication and none was provided.");
+ protected HttpPost postReq(String url) throws AuthenticationException {
+ if (authStr == null) {
+ throw new AuthenticationException("All DMaaP requests require authentication and none was provided.");
}
HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url);
- out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
+ out.setHeader("Authorization", String.format("Basic %s", authStr));
out.setConfig(getConfig(0));
return out;
}
private RequestConfig getConfig(int timeoutMs) {
Builder builder = RequestConfig.custom();
- builder.setSocketTimeout(timeoutMs + 5000);
+ builder.setSocketTimeout(timeoutMs + TIMEOUT_OFFSET);
return builder.build();
}
- public CloseableHttpClient getClient() {
- return getClient(false);
- }
-
- public CloseableHttpClient getClient(boolean useHttps) {
+ protected CloseableHttpClient getClient() {
return HttpClientBuilder.create().build();
}
- public String formatHostString(String host) {
+ protected String formatHostString(String host) {
return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT)));
}
- public String formatHostString(String host, boolean useHttps) {
+ private String formatHostString(String host, boolean useHttps) {
// Trim trailing slash
String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host;
- boolean hasProto = out.startsWith("http");
+ boolean hasProtocol = out.startsWith("http");
boolean hasPort = out.contains(":");
// Add protocol
- if (!hasProto) {
+ if (!hasProtocol) {
out = String.format("%s%s", (useHttps) ? "https://" : "http://", out);
}
-
// Add port
if (!hasPort) {
- out = String.format("%s:%d", out, (useHttps) ? 3905 : 3904);
+ out = String.format("%s:%d", out, (useHttps) ? HTTPS_PORT : HTTP_PORT);
}
-
return out;
-
}
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
index 822dfdd22..5149e0c32 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
@@ -24,14 +24,13 @@
package org.onap.appc.adapter.messaging.dmaap.http;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -44,7 +43,7 @@ import org.onap.appc.adapter.message.Consumer;
public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
- private final EELFLogger logger = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
// Default values
private static final int DEFAULT_TIMEOUT_MS = 15000;
@@ -54,14 +53,15 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
private List<String> urls;
private String filter;
-
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter) {
+
this(hosts, topicName, consumerName, consumerId, filter, null, null);
}
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter, String user, String password) {
+
urls = new ArrayList<>();
for (String host : hosts) {
urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
@@ -72,13 +72,13 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
@Override
public void updateCredentials(String user, String pass) {
- logger.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
this.setBasicAuth(user, pass);
}
@Override
public List<String> fetch(int waitMs, int limit) {
- logger.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
+ LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
List<String> out = new ArrayList<>();
try {
List<NameValuePair> urlParams = new ArrayList<>();
@@ -92,39 +92,38 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
builder.setParameters(urlParams);
URI uri = builder.build();
- logger.info(String.format("GET %s", uri));
+ LOG.info(String.format("GET %s", uri));
HttpGet request = getReq(uri, waitMs);
CloseableHttpResponse response = getClient().execute(request);
int httpStatus = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
- String body = (entity != null) ? EntityUtils.toString(entity) : null;
+ String body = (entity != null) ? entityToString(entity) : null;
- logger.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
+ LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
body != null ? body.length() : "null"));
response.close();
if (httpStatus == 200 && body != null) {
JSONArray json = new JSONArray(body);
- logger.info(String.format("Got %d messages from DMaaP", json.length()));
+ LOG.info(String.format("Got %d messages from DMaaP", json.length()));
for (int i = 0; i < json.length(); i++) {
out.add(json.getString(i));
}
} else {
- logger.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
+ LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
sleep(waitMs);
}
} catch (Exception e) {
if (urls.size() > 1) {
String failedUrl = urls.remove(0);
urls.add(failedUrl);
- logger.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
urls.get(0)));
}
- logger.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
+ LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
sleep(waitMs);
}
-
return out;
}
@@ -139,22 +138,18 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
return String.format("Consumer listening to [%s]", hostStr);
}
- @Override
- public void useHttps(boolean yes) {
+ String entityToString(HttpEntity entity) throws IOException {
+ return EntityUtils.toString(entity);
}
private void sleep(int ms) {
- logger.info(String.format("Sleeping for %ds after failed request", ms / 1000));
+ LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
try {
Thread.sleep(ms);
} catch (InterruptedException e1) {
- logger.error("Interrupted while sleeping", e1);
+ LOG.error("Interrupted while sleeping", e1);
Thread.currentThread().interrupt();
}
}
- @Override
- public void close() {
- }
-
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
index 74c0c26b2..fe22ea10b 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
@@ -24,16 +24,14 @@
package org.onap.appc.adapter.messaging.dmaap.http;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
@@ -41,7 +39,7 @@ import org.onap.appc.adapter.message.Producer;
public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer {
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
private static final String CONTENT_TYPE = "application/cambria";
private static final String URL_TEMPLATE = "%s/events/%s";
@@ -49,19 +47,20 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
private List<String> hosts;
private Set<String> topics;
- private boolean useHttps = false;
+ public HttpDmaapProducerImpl() {
+ //for test purposes
+ }
public HttpDmaapProducerImpl(Collection<String> urls, String topicName) {
- hosts = new ArrayList<>();
topics = new HashSet<>();
topics.add(topicName);
+ hosts = new ArrayList<>();
for (String host : urls) {
hosts.add(formatHostString(host));
}
}
-
@Override
public void updateCredentials(String user, String pass) {
LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
@@ -70,32 +69,16 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
@Override
public boolean post(String partition, String data) {
- int sent = 0;
+ long sent = 0;
try {
HttpPost request = postReq(null);
request.setHeader("Content-Type", CONTENT_TYPE);
request.setEntity(new StringEntity(bodyLine(partition, data)));
- for (String topic : topics) {
- String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
- try {
- request.setURI(new URI(uriStr));
- CloseableHttpResponse response = getClient().execute(request);
- if (response.getStatusLine().getStatusCode() == 200) {
- sent++;
- }
- response.close();
- } catch (Exception sendEx) {
- LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
- sendEx);
- if (hosts.size() > 1) {
- String failedUrl = hosts.remove(0);
- hosts.add(failedUrl);
- LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
- failedUrl, hosts.get(0)));
- }
- }
- }
+ sent = topics.stream()
+ .filter(topic -> sendRequest(request, topic))
+ .count();
+
} catch (Exception buildEx) {
LOG.error(
String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s",
@@ -105,26 +88,40 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
return sent == topics.size();
}
- @Override
- public void useHttps(boolean yes) {
- useHttps = yes;
+ private boolean sendRequest(HttpPost request, String topic) {
+ boolean successful = false;
+ String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
+ try {
+ request.setURI(new URI(uriStr));
+ CloseableHttpResponse response = getClient().execute(request);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ successful = true;
+ }
+ response.close();
+ } catch (Exception sendEx) {
+ LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
+ sendEx);
+ if (hosts.size() > 1) {
+ String failedUrl = hosts.remove(0);
+ hosts.add(failedUrl);
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
+ failedUrl, hosts.get(0)));
+ }
+ }
+ return successful;
}
/**
* Format the body for the application/cambria content type with no partitioning. See
*
- * @param msg
+ * @param message
* The message body to format
* @return A string in the application/cambria content type
*/
- private String bodyLine(String partition, String msg) {
- String p = (partition == null) ? "" : partition;
- String m = (msg == null) ? "" : msg;
- return String.format("%d.%d.%s%s", p.length(), m.length(), p, m);
+ private String bodyLine(String partition, String message) {
+ String prt = (partition == null) ? "" : partition;
+ String msg = (message == null) ? "" : message;
+ return String.format("%d.%d.%s%s", prt.length(), msg.length(), prt, msg);
}
- @Override
- public void close() {
- // Nothing to do
- }
}