summaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter
diff options
context:
space:
mode:
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java28
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java49
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java43
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java79
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java134
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java164
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java146
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Consumer.java30
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Producer.java24
9 files changed, 578 insertions, 119 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java
new file mode 100644
index 000000000..f670d6cdd
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+class AuthenticationException extends Exception {
+
+ public AuthenticationException(String message) {
+ super(message);
+ }
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
index e67007f64..0749eb185 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
@@ -34,79 +34,74 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
-public class CommonHttpClient {
+abstract class CommonHttpClient {
- public static final int HTTPS_PORT = 3905;
+ private static final int HTTP_PORT = 3904;
+ private static final int HTTPS_PORT = 3905;
+ private static final int TIMEOUT_OFFSET = 5000;
- private String AUTH_STR;
+ private String authStr;
protected void setBasicAuth(String username, String password) {
if (username != null && password != null) {
String plain = String.format("%s:%s", username, password);
- AUTH_STR = Base64.encodeBase64String(plain.getBytes());
+ authStr = Base64.encodeBase64String(plain.getBytes());
} else {
- AUTH_STR = null;
+ authStr = null;
}
}
- public HttpGet getReq(URI uri, int timeoutMs) throws Exception {
- if (AUTH_STR == null) {
- throw new Exception("All DMaaP requests require authentication and none was provided.");
+ protected HttpGet getReq(URI uri, int timeoutMs) throws AuthenticationException {
+ if (authStr == null) {
+ throw new AuthenticationException("All DMaaP requests require authentication and none was provided.");
}
HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri);
- out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
+ out.setHeader("Authorization", String.format("Basic %s", authStr));
out.setConfig(getConfig(timeoutMs));
return out;
}
- public HttpPost postReq(String url) throws Exception {
- if (AUTH_STR == null) {
- throw new Exception("All DMaaP requests require authentication and none was provided.");
+ protected HttpPost postReq(String url) throws AuthenticationException {
+ if (authStr == null) {
+ throw new AuthenticationException("All DMaaP requests require authentication and none was provided.");
}
HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url);
- out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
+ out.setHeader("Authorization", String.format("Basic %s", authStr));
out.setConfig(getConfig(0));
return out;
}
private RequestConfig getConfig(int timeoutMs) {
Builder builder = RequestConfig.custom();
- builder.setSocketTimeout(timeoutMs + 5000);
+ builder.setSocketTimeout(timeoutMs + TIMEOUT_OFFSET);
return builder.build();
}
- public CloseableHttpClient getClient() {
- return getClient(false);
- }
-
- public CloseableHttpClient getClient(boolean useHttps) {
+ protected CloseableHttpClient getClient() {
return HttpClientBuilder.create().build();
}
- public String formatHostString(String host) {
+ protected String formatHostString(String host) {
return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT)));
}
- public String formatHostString(String host, boolean useHttps) {
+ private String formatHostString(String host, boolean useHttps) {
// Trim trailing slash
String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host;
- boolean hasProto = out.startsWith("http");
+ boolean hasProtocol = out.startsWith("http");
boolean hasPort = out.contains(":");
// Add protocol
- if (!hasProto) {
+ if (!hasProtocol) {
out = String.format("%s%s", (useHttps) ? "https://" : "http://", out);
}
-
// Add port
if (!hasPort) {
- out = String.format("%s:%d", out, (useHttps) ? 3905 : 3904);
+ out = String.format("%s:%d", out, (useHttps) ? HTTPS_PORT : HTTP_PORT);
}
-
return out;
-
}
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
index 822dfdd22..5149e0c32 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
@@ -24,14 +24,13 @@
package org.onap.appc.adapter.messaging.dmaap.http;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -44,7 +43,7 @@ import org.onap.appc.adapter.message.Consumer;
public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
- private final EELFLogger logger = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
// Default values
private static final int DEFAULT_TIMEOUT_MS = 15000;
@@ -54,14 +53,15 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
private List<String> urls;
private String filter;
-
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter) {
+
this(hosts, topicName, consumerName, consumerId, filter, null, null);
}
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter, String user, String password) {
+
urls = new ArrayList<>();
for (String host : hosts) {
urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
@@ -72,13 +72,13 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
@Override
public void updateCredentials(String user, String pass) {
- logger.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
this.setBasicAuth(user, pass);
}
@Override
public List<String> fetch(int waitMs, int limit) {
- logger.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
+ LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
List<String> out = new ArrayList<>();
try {
List<NameValuePair> urlParams = new ArrayList<>();
@@ -92,39 +92,38 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
builder.setParameters(urlParams);
URI uri = builder.build();
- logger.info(String.format("GET %s", uri));
+ LOG.info(String.format("GET %s", uri));
HttpGet request = getReq(uri, waitMs);
CloseableHttpResponse response = getClient().execute(request);
int httpStatus = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
- String body = (entity != null) ? EntityUtils.toString(entity) : null;
+ String body = (entity != null) ? entityToString(entity) : null;
- logger.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
+ LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
body != null ? body.length() : "null"));
response.close();
if (httpStatus == 200 && body != null) {
JSONArray json = new JSONArray(body);
- logger.info(String.format("Got %d messages from DMaaP", json.length()));
+ LOG.info(String.format("Got %d messages from DMaaP", json.length()));
for (int i = 0; i < json.length(); i++) {
out.add(json.getString(i));
}
} else {
- logger.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
+ LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
sleep(waitMs);
}
} catch (Exception e) {
if (urls.size() > 1) {
String failedUrl = urls.remove(0);
urls.add(failedUrl);
- logger.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
urls.get(0)));
}
- logger.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
+ LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
sleep(waitMs);
}
-
return out;
}
@@ -139,22 +138,18 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
return String.format("Consumer listening to [%s]", hostStr);
}
- @Override
- public void useHttps(boolean yes) {
+ String entityToString(HttpEntity entity) throws IOException {
+ return EntityUtils.toString(entity);
}
private void sleep(int ms) {
- logger.info(String.format("Sleeping for %ds after failed request", ms / 1000));
+ LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
try {
Thread.sleep(ms);
} catch (InterruptedException e1) {
- logger.error("Interrupted while sleeping", e1);
+ LOG.error("Interrupted while sleeping", e1);
Thread.currentThread().interrupt();
}
}
- @Override
- public void close() {
- }
-
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
index 74c0c26b2..fe22ea10b 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
@@ -24,16 +24,14 @@
package org.onap.appc.adapter.messaging.dmaap.http;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
@@ -41,7 +39,7 @@ import org.onap.appc.adapter.message.Producer;
public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer {
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
private static final String CONTENT_TYPE = "application/cambria";
private static final String URL_TEMPLATE = "%s/events/%s";
@@ -49,19 +47,20 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
private List<String> hosts;
private Set<String> topics;
- private boolean useHttps = false;
+ public HttpDmaapProducerImpl() {
+ //for test purposes
+ }
public HttpDmaapProducerImpl(Collection<String> urls, String topicName) {
- hosts = new ArrayList<>();
topics = new HashSet<>();
topics.add(topicName);
+ hosts = new ArrayList<>();
for (String host : urls) {
hosts.add(formatHostString(host));
}
}
-
@Override
public void updateCredentials(String user, String pass) {
LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
@@ -70,32 +69,16 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
@Override
public boolean post(String partition, String data) {
- int sent = 0;
+ long sent = 0;
try {
HttpPost request = postReq(null);
request.setHeader("Content-Type", CONTENT_TYPE);
request.setEntity(new StringEntity(bodyLine(partition, data)));
- for (String topic : topics) {
- String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
- try {
- request.setURI(new URI(uriStr));
- CloseableHttpResponse response = getClient().execute(request);
- if (response.getStatusLine().getStatusCode() == 200) {
- sent++;
- }
- response.close();
- } catch (Exception sendEx) {
- LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
- sendEx);
- if (hosts.size() > 1) {
- String failedUrl = hosts.remove(0);
- hosts.add(failedUrl);
- LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
- failedUrl, hosts.get(0)));
- }
- }
- }
+ sent = topics.stream()
+ .filter(topic -> sendRequest(request, topic))
+ .count();
+
} catch (Exception buildEx) {
LOG.error(
String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s",
@@ -105,26 +88,40 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
return sent == topics.size();
}
- @Override
- public void useHttps(boolean yes) {
- useHttps = yes;
+ private boolean sendRequest(HttpPost request, String topic) {
+ boolean successful = false;
+ String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
+ try {
+ request.setURI(new URI(uriStr));
+ CloseableHttpResponse response = getClient().execute(request);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ successful = true;
+ }
+ response.close();
+ } catch (Exception sendEx) {
+ LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
+ sendEx);
+ if (hosts.size() > 1) {
+ String failedUrl = hosts.remove(0);
+ hosts.add(failedUrl);
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
+ failedUrl, hosts.get(0)));
+ }
+ }
+ return successful;
}
/**
* Format the body for the application/cambria content type with no partitioning. See
*
- * @param msg
+ * @param message
* The message body to format
* @return A string in the application/cambria content type
*/
- private String bodyLine(String partition, String msg) {
- String p = (partition == null) ? "" : partition;
- String m = (msg == null) ? "" : msg;
- return String.format("%d.%d.%s%s", p.length(), m.length(), p, m);
+ private String bodyLine(String partition, String message) {
+ String prt = (partition == null) ? "" : partition;
+ String msg = (message == null) ? "" : message;
+ return String.format("%d.%d.%s%s", prt.length(), msg.length(), prt, msg);
}
- @Override
- public void close() {
- // Nothing to do
- }
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java
new file mode 100644
index 000000000..1d6fcfd43
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java
@@ -0,0 +1,134 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCommonHttpClient {
+
+ private static final String HTTP = "http://";
+ private static final String HTTPS = "https://";
+ private static final String URL = "example.org/location";
+ private static final URI URI = java.net.URI.create(HTTP + URL);
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final int TIMEOUT = 15000;
+ private static final int TIMEOUT_OFFSET = 5000;
+ private static final int HTTP_PORT = 3904;
+ private static final int HTTPS_PORT = 3905;
+
+ private CommonHttpClient commonHttpClient;
+
+ @Before
+ public void setUp() {
+ commonHttpClient = new CommonHttpClient() {};
+ }
+
+ private void setBasicAuth() {
+ commonHttpClient.setBasicAuth(USERNAME, PASSWORD);
+ }
+
+ private void noBasicAuth() {
+ commonHttpClient.setBasicAuth(null, null);
+ }
+
+ @Test
+ public void shouldGetHttpRequest_whenSetBasicAuth() throws AuthenticationException {
+
+ setBasicAuth();
+
+ HttpGet httpGet = commonHttpClient.getReq(URI, TIMEOUT);
+
+ assertNotNull(httpGet);
+ assertNotNull(httpGet.getFirstHeader("Authorization"));
+ assertNotNull(httpGet.getConfig());
+ assertEquals(httpGet.getConfig().getSocketTimeout(), TIMEOUT + TIMEOUT_OFFSET);
+ }
+
+ @Test(expected = AuthenticationException.class)
+ public void shoudNotGetHttpRequest_whenBasicAuthNotSet() throws AuthenticationException {
+
+ noBasicAuth();
+
+ commonHttpClient.getReq(URI, TIMEOUT);
+ }
+
+ @Test
+ public void shouldPostHttpRequest_whenSetBasicAuth() throws AuthenticationException {
+
+ setBasicAuth();
+
+ HttpPost httpPost = commonHttpClient.postReq(URL);
+
+ assertNotNull(httpPost);
+ assertNotNull(httpPost.getFirstHeader("Authorization"));
+ assertNotNull(httpPost.getConfig());
+ assertEquals(httpPost.getConfig().getSocketTimeout(), TIMEOUT_OFFSET);
+ }
+
+ @Test(expected = AuthenticationException.class)
+ public void shoudNotPostHttpRequest_whenBasicAuthNotSet() throws AuthenticationException {
+
+ noBasicAuth();
+
+ commonHttpClient.postReq(URL);
+ }
+
+ @Test
+ public void shouldGetClient() {
+ assertNotNull(commonHttpClient.getClient());
+ }
+
+ @Test
+ public void shouldFormatHostString() {
+ String httpUrl = HTTP + URL + ":" + HTTP_PORT;
+ String httpsUrl = HTTPS + URL + ":" + HTTPS_PORT;
+ String outputUrl;
+
+ outputUrl = commonHttpClient.formatHostString(httpUrl);
+ assertTrue(assertMessage(httpUrl, outputUrl), httpUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(httpsUrl);
+ assertTrue(assertMessage(httpsUrl, outputUrl), httpsUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(httpsUrl + "/");
+ assertTrue(assertMessage(httpsUrl, outputUrl), httpsUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(URL + ":" + HTTP_PORT);
+ assertTrue(assertMessage(httpUrl, outputUrl), httpUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(URL + ":" + HTTPS_PORT);
+ assertTrue(assertMessage(httpsUrl, outputUrl), httpsUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(URL);
+ assertTrue(assertMessage(httpUrl, outputUrl), httpUrl.equals(outputUrl));
+ }
+
+ private String assertMessage(String expected, String actual) {
+ return "Expected: " + expected + " Actual: " + actual;
+ }
+} \ No newline at end of file
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java
new file mode 100644
index 000000000..d5707c22f
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java
@@ -0,0 +1,164 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import static javax.ws.rs.core.Response.Status.FORBIDDEN;
+import static javax.ws.rs.core.Response.Status.OK;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+
+public class TestHttpDmaapConsumerImpl {
+
+ private static final Collection<String> URLS = Arrays.asList("test.com", "test.org");
+ private static final Collection<String> OUTPUT_MSG = Arrays.asList("FirstMessage", "SecondMessage");
+ private static final String TOPIC_NAME = "Topic";
+ private static final String CONSUMER_NAME = "Consumer";
+ private static final String CONSUMER_ID = "Id";
+ private static final String FILTER = "filter";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String MESSAGE_BODY = "[FirstMessage, SecondMessage]";
+ private static final int TIMEOUT_MS = 1000;
+ private static final int LIMIT = 1000;
+
+ @Spy
+ private HttpDmaapConsumerImpl httpDmaapConsumer;
+
+ @Mock
+ private CloseableHttpClient httpClient;
+
+ @Mock
+ private CloseableHttpResponse httpResponse;
+
+ @Mock
+ private StatusLine statusLine;
+
+ @Mock
+ private HttpEntity entity;
+
+ @Before
+ public void setUp() throws Exception {
+ httpDmaapConsumer = new HttpDmaapConsumerImpl(URLS, TOPIC_NAME, CONSUMER_NAME, CONSUMER_ID, FILTER);
+ httpDmaapConsumer.updateCredentials(USERNAME, PASSWORD);
+
+ MockitoAnnotations.initMocks(this);
+ doReturn(httpClient).when(httpDmaapConsumer).getClient();
+ when(httpClient.execute(any(HttpGet.class))).thenReturn(httpResponse);
+ when(httpResponse.getStatusLine()).thenReturn(statusLine);
+ when(httpResponse.getEntity()).thenReturn(entity);
+ doReturn(MESSAGE_BODY).when(httpDmaapConsumer).entityToString(same(entity));
+ }
+
+ @Test
+ public void shouldGetHttpRequest() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ List<String> output = httpDmaapConsumer.fetch();
+
+ assertFalse(output.isEmpty());
+ assertTrue(output.containsAll(OUTPUT_MSG));
+ verify(httpClient).execute(any(HttpGet.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).getEntity();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+ @Test
+ public void shouldNotGetHttpRequest_whenCredencialsAreNotProvided() {
+
+ httpDmaapConsumer.updateCredentials(null, null);
+
+ List<String> output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertTrue(output.isEmpty());
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenHttpResponseIsOtherThanOk() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(FORBIDDEN.getStatusCode());
+
+ List<String> output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertTrue(output.isEmpty());
+ verify(httpClient).execute(any(HttpGet.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).getEntity();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenRequestToOneOfUrlsCannotBeSent() throws Exception {
+
+ when(httpClient.execute(any(HttpGet.class))).thenThrow(new ClientProtocolException());
+
+ List<String> output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertTrue(output.isEmpty());
+ verify(httpClient).execute(any(HttpGet.class));
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+
+
+ reset(httpClient);
+ when(httpClient.execute(any(HttpGet.class))).thenReturn(httpResponse);
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertFalse(output.isEmpty());
+ assertTrue(output.containsAll(OUTPUT_MSG));
+ verify(httpClient).execute(any(HttpGet.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).getEntity();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+} \ No newline at end of file
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java
new file mode 100644
index 000000000..a3d2e3afa
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java
@@ -0,0 +1,146 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import static javax.ws.rs.core.Response.Status.FORBIDDEN;
+import static javax.ws.rs.core.Response.Status.OK;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+
+public class TestHttpDmaapProducerImpl {
+
+ private static final Collection<String> URLS = Arrays.asList("test.com", "test.org");
+ private static final String TOPIC_NAME = "Topic";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String PARTITION = "partition";
+ private static final String DATA = "data";
+
+
+ @Spy
+ private HttpDmaapProducerImpl httpDmaapProducer = new HttpDmaapProducerImpl();
+
+ @Mock
+ private CloseableHttpClient httpClient;
+
+ @Mock
+ private CloseableHttpResponse httpResponse;
+
+ @Mock
+ private StatusLine statusLine;
+
+ @Before
+ public void setUp() throws Exception {
+ httpDmaapProducer = new HttpDmaapProducerImpl(URLS, TOPIC_NAME);
+ httpDmaapProducer.updateCredentials(USERNAME, PASSWORD);
+
+ MockitoAnnotations.initMocks(this);
+ doReturn(httpClient).when(httpDmaapProducer).getClient();
+ when(httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+ when(httpResponse.getStatusLine()).thenReturn(statusLine);
+ }
+
+ @Test
+ public void shouldPostHttpRequest() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertTrue(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+
+ @Test
+ public void shouldNotPostHttpRequest_whenCredencialsAreNotProvided() {
+
+ httpDmaapProducer.updateCredentials(null, null);
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertFalse(successful);
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenHttpResponseIsOtherThanOk() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(FORBIDDEN.getStatusCode());
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertFalse(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenRequestToOneOfUrlsCannotBeSent() throws Exception {
+
+ when(httpClient.execute(any(HttpPost.class))).thenThrow(new ClientProtocolException());
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertFalse(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+
+
+ reset(httpClient);
+ when(httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertTrue(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+} \ No newline at end of file
diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Consumer.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Consumer.java
index 92033064c..e3a46087d 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Consumer.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Consumer.java
@@ -9,15 +9,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
* ============LICENSE_END=========================================================
*/
@@ -30,14 +30,14 @@ public interface Consumer {
/**
* Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty.
- *
+ *
* @return A list of strings representing the messages pulled from the topic.
*/
- public List<String> fetch();
+ List<String> fetch();
/**
* Gets a batch of messages from the topic.
- *
+ *
* @param waitMs
* The amount of time to wait in milliseconds if the topic is empty for data to be written. Should be no
* less than 15000ms to prevent too many requests
@@ -45,29 +45,29 @@ public interface Consumer {
* The amount of messages to fetch
* @return A list of strings representing the messages pulled from the topic.
*/
- public List<String> fetch(int waitMs, int limit);
+ List<String> fetch(int waitMs, int limit);
/**
- * Updates the api credentials for making authenticated requests
- *
+ * Updates the api credentials for making authenticated requests.
+ *
* @param apiKey
* The public key to authenticate with
* @param apiSecret
* The secret key to authenticate with
*/
- public void updateCredentials(String apiKey, String apiSecret);
+ void updateCredentials(String apiKey, String apiSecret);
/**
* Creates a dmaap client using a https connection
- *
+ *
* @param yes
- * True if https should be used, false otherwise
+ * True if https should be used, false otherwise.
*/
- public void useHttps(boolean yes);
+ default void useHttps(boolean yes) {}
/**
- * Closes the dmaap client https connection
+ * Closes the dmaap client https connection.
*/
- void close();
+ default void close() {}
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Producer.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Producer.java
index 52ead0253..c7194d080 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Producer.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/onap/appc/adapter/message/Producer.java
@@ -9,15 +9,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
* ============LICENSE_END=========================================================
*/
@@ -26,29 +26,29 @@ package org.onap.appc.adapter.message;
public interface Producer {
- public boolean post(String partition, String data);
+ boolean post(String partition, String data);
/**
- * Updates the api credentials for making authenticated requests
- *
+ * Updates the api credentials for making authenticated requests.
+ *
* @param apiKey
* The public key to authenticate with
* @param apiSecret
* The secret key to authenticate with
*/
- public void updateCredentials(String apiKey, String apiSecret);
+ void updateCredentials(String apiKey, String apiSecret);
/**
- * Creates a dmaap client using a https connection
- *
+ * Creates a dmaap client using a https connection.
+ *
* @param yes
* True if https should be used, false otherwise
*/
- public void useHttps(boolean yes);
+ default void useHttps(boolean yes) {}
/**
- * Closes the dmaap client https connection
+ * Closes the dmaap client https connection.
*/
- void close();
+ default void close() {}
}