diff options
Diffstat (limited to 'dmaap-listener')
4 files changed, 467 insertions, 250 deletions
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java index d261d1c49..13146e4ec 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -57,6 +58,7 @@ public class MessageRouterHttpClient implements SdncDmaapConsumer { protected final String DEFAULT_READ_TIMEOUT_MINUTES = "3"; protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000"; protected final String DEFAULT_LIMIT = null; + protected final String DEFAULT_FETCH_PAUSE = "5000"; public MessageRouterHttpClient() { @@ -105,52 +107,60 @@ public class MessageRouterHttpClient implements SdncDmaapConsumer { public void init(Properties baseProperties, String consumerPropertiesPath) { try { baseProperties.load(new FileInputStream(new File(consumerPropertiesPath))); - this.properties = baseProperties; - String username = baseProperties.getProperty("username"); - String password = baseProperties.getProperty("password"); - String topic = baseProperties.getProperty("topic"); - String group = baseProperties.getProperty("group"); - String host = baseProperties.getProperty("host"); - String id = baseProperties.getProperty("id"); - - String filter = baseProperties.getProperty("filter"); - if (filter != null) { - if (filter.length() > 0) { + processProperties(baseProperties); + } catch (FileNotFoundException e) { + Log.error("FileNotFoundException while reading consumer properties", e); + } catch (IOException e) { + Log.error("IOException while reading consumer properties", e); + } + } + + protected void processProperties(Properties properties) { + this.properties = properties; + String username = properties.getProperty("username"); + String password = properties.getProperty("password"); + String topic = properties.getProperty("topic"); + String group = properties.getProperty("group"); + String host = properties.getProperty("host"); + String id = properties.getProperty("id"); + + String filter = properties.getProperty("filter"); + if (filter != null) { + if (filter.length() > 0) { + try { filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name()); - } else { + } catch (UnsupportedEncodingException e) { + Log.error("Filter could not be encoded, setting to null", e); filter = null; } + } else { + filter = null; } + } - String limitString = baseProperties.getProperty("limit", DEFAULT_LIMIT); - Integer limit = null; - if (limitString != null && limitString.length() > 0) { - limit = Integer.valueOf(limitString); - } - - Integer timeoutQueryParamValue = - Integer.valueOf(baseProperties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE)); - Integer connectTimeoutSeconds = Integer - .valueOf(baseProperties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT_SECONDS)); - Integer readTimeoutMinutes = - Integer.valueOf(baseProperties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT_MINUTES)); - - Builder builder = client.target(uri).request("application/json"); - if (username != null && password != null && username.length() > 0 && password.length() > 0) { - String authorizationString = buildAuthorizationString(username, password); - builder.header("Authorization", authorizationString); - } - this.uri = buildUri(topic, group, id, host, timeoutQueryParamValue, limit, filter); - this.client = getClient(connectTimeoutSeconds, readTimeoutMinutes); + String limitString = properties.getProperty("limit", DEFAULT_LIMIT); + Integer limit = null; + if (limitString != null && limitString.length() > 0) { + limit = Integer.valueOf(limitString); + } - this.getMessages = builder.buildGet(); - this.fetchPause = Integer.valueOf(baseProperties.getProperty("fetchPause")); - this.isReady = true; - } catch (FileNotFoundException e) { - Log.error("FileNotFoundException while reading consumer properties", e); - } catch (IOException e) { - Log.error("IOException while reading consumer properties", e); + Integer timeoutQueryParamValue = + Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE)); + Integer connectTimeoutSeconds = Integer + .valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT_SECONDS)); + Integer readTimeoutMinutes = + Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT_MINUTES)); + this.client = getClient(connectTimeoutSeconds, readTimeoutMinutes); + this.uri = buildUri(topic, group, id, host, timeoutQueryParamValue, limit, filter); + Builder builder = client.target(uri).request("application/json"); + if (username != null && password != null && username.length() > 0 && password.length() > 0) { + String authorizationString = buildAuthorizationString(username, password); + builder.header("Authorization", authorizationString); } + + this.getMessages = builder.buildGet(); + this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause",DEFAULT_FETCH_PAUSE)); + this.isReady = true; } @Override diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java index d720e5fcd..3336b81f6 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java @@ -1,211 +1,221 @@ -/*-
- * ============LICENSE_START=======================================================
- * openECOMP : SDN-C
- * ================================================================================
- * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.ccsdk.sli.northbound.dmaapclient;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.UnsupportedEncodingException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.Properties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.net.HttpURLConnection;
-
-/*
- * java.net based client to build message router consumers
- */
-public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {
- private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);
-
- protected Boolean isReady = false;
- protected Boolean isRunning = false;
- protected URL url;
- protected Integer fetchPause;
- protected Properties properties;
- protected final String DEFAULT_CONNECT_TIMEOUT = "30000";
- protected final String DEFAULT_READ_TIMEOUT = "180000";
- protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
- protected final String DEFAULT_LIMIT = null;
- private String authorizationString;
- protected Integer connectTimeout;
- protected Integer readTimeout;
- protected String topic;
-
- public MessageRouterHttpClientJdk() {}
-
- @Override
- public void run() {
- if (isReady) {
- isRunning = true;
- while (isRunning) {
- HttpURLConnection httpUrlConnection = null;
- try {
- httpUrlConnection = (HttpURLConnection) url.openConnection();
- if (authorizationString != null) {
- httpUrlConnection.addRequestProperty("Authorization", authorizationString);
- }
- httpUrlConnection.setRequestMethod("GET");
- httpUrlConnection.setRequestProperty("Accept", "application/json");
- httpUrlConnection.setUseCaches(false);
- httpUrlConnection.setConnectTimeout(connectTimeout);
- httpUrlConnection.setReadTimeout(readTimeout);
- httpUrlConnection.connect();
- int status = httpUrlConnection.getResponseCode();
- Log.info("GET " + url + " returned http status " + status);
- if (status < 300) {
- BufferedReader br =
- new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));
- StringBuilder sb = new StringBuilder();
- String line;
- while ((line = br.readLine()) != null) {
- sb.append(line + "\n");
- }
- br.close();
- String responseBody = sb.toString();
- if (responseBody.contains("{")) {
- // Get rid of opening [" entity =
- responseBody = responseBody.substring(2);
- // Get rid of closing "]
- responseBody = responseBody.substring(0, responseBody.length() - 2);
- // Split the json array into individual elements to process
- for (String message : responseBody.split("\",\"")) {
- // unescape the json
- message = message.replace("\\\"", "\"");
- // Topic names cannot contain periods
- processMsg(message);
- }
- } else {
- Log.info("Entity doesn't appear to contain JSON elements, logging body");
- Log.info(responseBody);
- }
- }
- } catch (Exception e) {
- Log.error("GET " + url + " failed.", e);
- } finally {
- if (httpUrlConnection != null) {
- httpUrlConnection.disconnect();
- }
- Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");
- try {
- Thread.sleep(fetchPause);
- } catch (InterruptedException e) {
- Log.error("Could not sleep thread", e);
- }
- }
- }
- }
- }
-
- @Override
- public void init(Properties baseProperties, String consumerPropertiesPath) {
- try {
- baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
-
- this.properties = baseProperties;
- String username = properties.getProperty("username");
- String password = properties.getProperty("password");
- topic = properties.getProperty("topic");
- String group = properties.getProperty("group");
- String host = properties.getProperty("host");
- String id = properties.getProperty("id");
-
- String filter = properties.getProperty("filter");
- if (filter != null) {
- if (filter.length() > 0) {
- try {
- filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
- } catch (UnsupportedEncodingException e) {
- Log.error("Couldn't encode filter string", e);
- }
- } else {
- filter = null;
- }
- }
-
- String limitString = properties.getProperty("limit", DEFAULT_LIMIT);
- Integer limit = null;
- if (limitString != null && limitString.length() > 0) {
- limit = Integer.valueOf(limitString);
- }
-
- Integer timeoutQueryParamValue =
- Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
- connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));
- readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));
- if (username != null && password != null && username.length() > 0 && password.length() > 0) {
- authorizationString = buildAuthorizationString(username, password);
- }
- String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);
- this.url = new URL(urlString);
- this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause"));
- this.isReady = true;
- } catch (FileNotFoundException e) {
- Log.error("FileNotFoundException while reading consumer properties", e);
- } catch (IOException e) {
- Log.error("IOException while reading consumer properties", e);
- }
- }
-
- public void processMsg(String msg) {
- Log.info(msg);
- }
-
- protected String buildAuthorizationString(String userName, String password) {
- String basicAuthString = userName + ":" + password;
- basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
- return "Basic " + basicAuthString;
- }
-
- protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,
- Integer timeout, Integer limit, String filter) {
- StringBuilder sb = new StringBuilder();
- sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);
- sb.append("?timeout=" + timeout);
-
- if (limit != null) {
- sb.append("&limit=" + limit);
- }
- if (filter != null) {
- sb.append("&filter=" + filter);
- }
- return sb.toString();
- }
-
- @Override
- public boolean isReady() {
- return isReady;
- }
-
- @Override
- public boolean isRunning() {
- return isRunning;
- }
-
-}
+/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.northbound.dmaapclient; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * java.net based client to build message router consumers + */ +public class MessageRouterHttpClientJdk implements SdncDmaapConsumer { + private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class); + + protected Boolean isReady = false; + protected Boolean isRunning = false; + protected URL url; + protected Integer fetchPause; + protected Properties properties; + protected final String DEFAULT_CONNECT_TIMEOUT = "30000"; + protected final String DEFAULT_READ_TIMEOUT = "180000"; + protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000"; + protected final String DEFAULT_LIMIT = null; + protected final String DEFAULT_FETCH_PAUSE = "5000"; + + private String authorizationString; + protected Integer connectTimeout; + protected Integer readTimeout; + protected String topic; + + public MessageRouterHttpClientJdk() {} + + @Override + public void run() { + if (isReady) { + isRunning = true; + while (isRunning) { + HttpURLConnection httpUrlConnection = null; + try { + httpUrlConnection = buildHttpURLConnection(); + httpUrlConnection.connect(); + int status = httpUrlConnection.getResponseCode(); + Log.info("GET " + url + " returned http status " + status); + if (status < 300) { + BufferedReader br = + new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream())); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = br.readLine()) != null) { + sb.append(line + "\n"); + } + br.close(); + String responseBody = sb.toString(); + if (responseBody.contains("{")) { + // Get rid of opening [" entity = + responseBody = responseBody.substring(2); + // Get rid of closing "] + responseBody = responseBody.substring(0, responseBody.length() - 2); + // Split the json array into individual elements to process + for (String message : responseBody.split("\",\"")) { + // unescape the json + message = message.replace("\\\"", "\""); + // Topic names cannot contain periods + processMsg(message); + } + } else { + Log.info("Entity doesn't appear to contain JSON elements, logging body"); + Log.info(responseBody); + } + } + } catch (Exception e) { + Log.error("GET " + url + " failed.", e); + } finally { + if (httpUrlConnection != null) { + httpUrlConnection.disconnect(); + } + Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again."); + try { + Thread.sleep(fetchPause); + } catch (InterruptedException e) { + Log.error("Could not sleep thread", e); + } + } + } + } + } + + @Override + public void init(Properties baseProperties, String consumerPropertiesPath) { + try { + baseProperties.load(new FileInputStream(new File(consumerPropertiesPath))); + processProperties(baseProperties); + } catch (FileNotFoundException e) { + Log.error("FileNotFoundException while reading consumer properties", e); + } catch (IOException e) { + Log.error("IOException while reading consumer properties", e); + } + } + + protected void processProperties(Properties properties) throws MalformedURLException { + this.properties = properties; + String username = properties.getProperty("username"); + String password = properties.getProperty("password"); + topic = properties.getProperty("topic"); + String group = properties.getProperty("group"); + String host = properties.getProperty("host"); + String id = properties.getProperty("id"); + + String filter = properties.getProperty("filter"); + if (filter != null) { + if (filter.length() > 0) { + try { + filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + Log.error("Couldn't encode filter string", e); + } + } else { + filter = null; + } + } + + String limitString = properties.getProperty("limit", DEFAULT_LIMIT); + Integer limit = null; + if (limitString != null && limitString.length() > 0) { + limit = Integer.valueOf(limitString); + } + + Integer timeoutQueryParamValue = + Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE)); + connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT)); + readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT)); + if (username != null && password != null && username.length() > 0 && password.length() > 0) { + authorizationString = buildAuthorizationString(username, password); + } + String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter); + this.url = new URL(urlString); + this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause", DEFAULT_FETCH_PAUSE)); + this.isReady = true; + } + + public void processMsg(String msg) { + Log.info(msg); + } + + protected String buildAuthorizationString(String userName, String password) { + String basicAuthString = userName + ":" + password; + basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes()); + return "Basic " + basicAuthString; + } + + protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host, + Integer timeout, Integer limit, String filter) { + StringBuilder sb = new StringBuilder(); + sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId); + sb.append("?timeout=" + timeout); + + if (limit != null) { + sb.append("&limit=" + limit); + } + if (filter != null) { + sb.append("&filter=" + filter); + } + return sb.toString(); + } + + @Override + public boolean isReady() { + return isReady; + } + + @Override + public boolean isRunning() { + return isRunning; + } + + protected HttpURLConnection buildHttpURLConnection() throws IOException { + HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection(); + if (authorizationString != null) { + httpUrlConnection.setRequestProperty("Authorization", authorizationString); + } + httpUrlConnection.setRequestMethod("GET"); + httpUrlConnection.setRequestProperty("Accept", "application/json"); + httpUrlConnection.setUseCaches(false); + httpUrlConnection.setConnectTimeout(connectTimeout); + httpUrlConnection.setReadTimeout(readTimeout); + return httpUrlConnection; + } + +} diff --git a/dmaap-listener/src/test/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdkTest.java b/dmaap-listener/src/test/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdkTest.java new file mode 100644 index 000000000..03f832801 --- /dev/null +++ b/dmaap-listener/src/test/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdkTest.java @@ -0,0 +1,100 @@ +package org.onap.ccsdk.sli.northbound.dmaapclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Properties; +import org.junit.Test; + +public class MessageRouterHttpClientJdkTest { + public MessageRouterHttpClientJdk getClient() throws MalformedURLException { + Properties properties = new Properties(); + properties.put("username", "my_user"); + properties.put("password", "my_password"); + properties.put("topic", "network_automation"); + properties.put("group", "message_processors"); + properties.put("host", "dmaap-server.com"); + properties.put("id", "machine_one"); + properties.put("fetchPause", "3000"); + MessageRouterHttpClientJdk client = new MessageRouterHttpClientJdk(); + client.processProperties(properties); + return client; + } + + @Test + public void processMsg() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClientJdk client = getClient(); + client.processMsg(null); + } + + @Test + public void isReady() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClientJdk client = getClient(); + assertEquals(true, client.isReady()); + } + + @Test + public void isRunning() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClientJdk client = getClient(); + assertEquals(false, client.isRunning()); + } + + @Test + public void buidUrl() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClientJdk client = getClient(); + assertEquals(new URL( + "http://dmaap-server.com/events/network_automation/message_processors/machine_one?timeout=15000"), + client.url); + } + + @Test + public void buidUrlWithFilter() throws InvalidMessageException, MalformedURLException { + Properties properties = new Properties(); + properties.put("username", "my_user"); + properties.put("password", "my_password"); + properties.put("topic", "network_automation"); + properties.put("group", "message_processors"); + properties.put("host", "dmaap-server.com"); + properties.put("id", "machine_one"); + properties.put("filter", "{\"class\":\"Contains\",\"string\":\"hello\",\"value\":\"world\"}"); + properties.put("fetchPause", "3000"); + MessageRouterHttpClientJdk client = new MessageRouterHttpClientJdk(); + client.processProperties(properties); + assertEquals(new URL( + "http://dmaap-server.com/events/network_automation/message_processors/machine_one?timeout=15000&filter=%7B%22class%22%3A%22Contains%22%2C%22string%22%3A%22hello%22%2C%22value%22%3A%22world%22%7D"), + client.url); + } + + @Test + public void buildAuthorizationString() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClientJdk client = getClient(); + String authString = client.buildAuthorizationString("Hello", "World"); + assertEquals("Basic SGVsbG86V29ybGQ=", authString); + } + + @Test + public void clientFromProperties() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClientJdk client = new MessageRouterHttpClientJdk(); + Properties props = new Properties(); + client.init(props, "src/test/resources/dmaap-consumer-1.properties"); + assertEquals(new URL( + "http://localhost:3904/events/ccsdk-topic/ccsdk-unittest/ccsdk_unittest?timeout=15000&limit=1000"), + client.url); + } + + @Test + public void buildHttpURLConnection() throws InvalidMessageException, IOException { + MessageRouterHttpClientJdk client = getClient(); + HttpURLConnection connection = client.buildHttpURLConnection(); + assertEquals("GET", connection.getRequestMethod()); + assertTrue(connection.getRequestProperties().get("Accept").contains("application/json")); + assertEquals(false, connection.getUseCaches()); + Integer defaultConnectTimeout = Integer.valueOf(client.DEFAULT_CONNECT_TIMEOUT); + Integer defaultReadTimeout = Integer.valueOf(client.DEFAULT_READ_TIMEOUT); + assertEquals(defaultConnectTimeout.intValue(), connection.getConnectTimeout()); + assertEquals(defaultReadTimeout.intValue(), connection.getReadTimeout()); + } +} diff --git a/dmaap-listener/src/test/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientTest.java b/dmaap-listener/src/test/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientTest.java new file mode 100644 index 000000000..7567e2a9e --- /dev/null +++ b/dmaap-listener/src/test/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientTest.java @@ -0,0 +1,97 @@ +package org.onap.ccsdk.sli.northbound.dmaapclient; + +import static org.junit.Assert.assertEquals; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import org.junit.Test; + +public class MessageRouterHttpClientTest { + + class MockMessageRouterHttpClient extends MessageRouterHttpClient { + protected Client getClient(Integer connectTimeoutSeconds, Integer readTimeoutMinutes) { + ClientBuilder clientBuilder = ClientBuilder.newBuilder(); + return clientBuilder.build(); + } + } + + public MessageRouterHttpClient getClient() { + Properties properties = new Properties(); + properties.put("username", "my_user"); + properties.put("password", "my_password"); + properties.put("topic", "network_automation"); + properties.put("group", "message_processors"); + properties.put("host", "dmaap-server.com"); + properties.put("id", "machine_one"); + properties.put("fetch", "machine_one"); + + MockMessageRouterHttpClient client = new MockMessageRouterHttpClient(); + client.processProperties(properties); + return client; + } + + @Test + public void processMsg() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClient client = getClient(); + client.processMsg(null); + } + + @Test + public void isReady() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClient client = getClient(); + assertEquals(true, client.isReady()); + } + + @Test + public void isRunning() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClient client = getClient(); + assertEquals(false, client.isRunning()); + } + + @Test + public void buidUrl() throws InvalidMessageException, MalformedURLException, URISyntaxException { + MessageRouterHttpClient client = getClient(); + assertEquals(new URI( + "http://dmaap-server.com/events/network_automation/message_processors/machine_one?timeout=15000"), + client.uri); + } + + @Test + public void buidUrlWithFilter() throws InvalidMessageException, MalformedURLException, URISyntaxException { + Properties properties = new Properties(); + properties.put("username", "my_user"); + properties.put("password", "my_password"); + properties.put("topic", "network_automation"); + properties.put("group", "message_processors"); + properties.put("host", "dmaap-server.com"); + properties.put("id", "machine_one"); + properties.put("filter", "{\"class\":\"Contains\",\"string\":\"hello\",\"value\":\"world\"}"); + properties.put("fetchPause", "3000"); + MessageRouterHttpClient client = new MockMessageRouterHttpClient(); + client.processProperties(properties); + assertEquals(new URI( + "http://dmaap-server.com/events/network_automation/message_processors/machine_one?timeout=15000&filter=%7B%22class%22%3A%22Contains%22%2C%22string%22%3A%22hello%22%2C%22value%22%3A%22world%22%7D"), + client.uri); + } + + @Test + public void buildAuthorizationString() throws InvalidMessageException, MalformedURLException { + MessageRouterHttpClient client = getClient(); + String authString = client.buildAuthorizationString("Hello", "World"); + assertEquals("Basic SGVsbG86V29ybGQ=", authString); + } + + @Test + public void clientFromProperties() throws InvalidMessageException, MalformedURLException, URISyntaxException { + MessageRouterHttpClient client = new MockMessageRouterHttpClient(); + Properties props = new Properties(); + client.init(props, "src/test/resources/dmaap-consumer-1.properties"); + assertEquals(new URI( + "http://localhost:3904/events/ccsdk-topic/ccsdk-unittest/ccsdk_unittest?timeout=15000&limit=1000"), + client.uri); + } + +} |