aboutsummaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java20
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java69
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java71
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java14
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java18
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java28
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java288
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java44
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java86
-rw-r--r--src/main/java/org/onap/dmaap/mr/dme/client/SimpleExampleConsumer.java2
-rw-r--r--src/main/java/org/onap/dmaap/mr/dme/client/SimpleExamplePublisher.java2
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java5
12 files changed, 291 insertions, 356 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java
index 8936bea..91e10e0 100644
--- a/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java
+++ b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java
@@ -42,6 +42,7 @@ import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
*/
public class MRClientBuilders
{
+ private final static String ILLEGAL_ARGUMENT_MESSAGE = "You must provide at least one host and a topic name.";
/**
* Instantiates MRClientBuilders.
@@ -56,10 +57,6 @@ public class MRClientBuilders
*/
public static class ConsumerBuilder
{
- /**
- * Construct a consumer builder.
- */
- public ConsumerBuilder () {}
/**
* Set the host list
@@ -151,7 +148,7 @@ public class MRClientBuilders
{
if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
{
- throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE );
}
if ( fGroup == null )
@@ -194,7 +191,6 @@ public class MRClientBuilders
*/
public static class PublisherBuilder
{
- public PublisherBuilder () {}
/**
* Set the MR/UEB host(s) to use
@@ -298,7 +294,7 @@ public class MRClientBuilders
{
if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
{
- throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE );
}
if ( sfPublisherMock != null ) return sfPublisherMock;
@@ -331,10 +327,6 @@ public class MRClientBuilders
*/
public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
{
- /**
- * Construct an identity manager builder.
- */
- public IdentityManagerBuilder () {}
@Override
protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
@@ -350,10 +342,6 @@ public class MRClientBuilders
*/
public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
{
- /**
- * Construct an topic manager builder.
- */
- public TopicManagerBuilder () {}
@Override
protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
@@ -434,7 +422,7 @@ public class MRClientBuilders
{
if ( fHosts.isEmpty() )
{
- throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE );
}
final T mgr = constructClient ( fHosts );
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java
index e5ea48e..1780703 100644
--- a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java
+++ b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java
@@ -23,11 +23,7 @@
*******************************************************************************/
package org.onap.dmaap.mr.client;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
+import java.io.*;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Map;
@@ -62,10 +58,13 @@ public class MRClientFactory {
private static final String AUTH_DATE = "authDate";
private static final String PASSWORD = "password";
private static final String USERNAME = "username";
+ private static final String FILTER = "filter";
+ private static final String HOST = "host";
private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
private static final String TOPIC = "topic";
private static final String TRANSPORT_TYPE = "TransportType";
- public static MultivaluedMap<String, Object> HTTPHeadersMap;
+
+ private static MultivaluedMap<String, Object> httpHeadersMap;
public static Map<String, String> DME2HeadersMap;
public static String routeFilePath;
@@ -80,7 +79,23 @@ public class MRClientFactory {
private MRClientFactory() {
//prevents instantiation.
}
-
+
+ /**
+ * Add getter to avoid direct access to static header map.
+ * @return
+ */
+ public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
+ return httpHeadersMap;
+ }
+
+ /**
+ * Add setter to avoid direct access to static header map.
+ * @param headers
+ */
+ public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
+ httpHeadersMap = headers;
+ }
+
/**
* Create a consumer instance with the default timeout and no limit on
* messages returned. This consumer operates as an independent consumer
@@ -400,8 +415,6 @@ public class MRClientFactory {
* use gzip compression
* @param protocolFlag
* http auth or ueb auth or dme2 method
- * @param producerFilePath
- * all properties for publisher
* @return MRBatchingPublisher obj
*/
public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
@@ -421,7 +434,7 @@ public class MRClientFactory {
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown
*
- * @param Properties
+ * @param props
* props set all properties for publishing message
* @return MRBatchingPublisher obj
* @throws FileNotFoundException
@@ -438,7 +451,7 @@ public class MRClientFactory {
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown
*
- * @param Properties
+ * @param props
* props set all properties for publishing message
* @return MRBatchingPublisher obj
* @throws FileNotFoundException
@@ -465,9 +478,10 @@ public class MRClientFactory {
*/
public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(producerFilePath));
Properties props = new Properties();
- props.load(reader);
+ try(InputStream input = new FileInputStream(producerFilePath)) {
+ props.load(input);
+ }
return createBatchingPublisher(props);
}
@@ -485,9 +499,10 @@ public class MRClientFactory {
*/
public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(producerFilePath));
Properties props = new Properties();
- props.load(reader);
+ try(InputStream input = new FileInputStream(producerFilePath)) {
+ props.load(input);
+ }
return createBatchingPublisher(props, withResponse);
}
@@ -497,7 +512,7 @@ public class MRClientFactory {
MRSimplerBatchPublisher pub;
if (withResponse) {
pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
.onTopic(props.getProperty(TOPIC))
.batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
Integer.parseInt(props.getProperty("maxAgeMs").toString()))
@@ -506,14 +521,14 @@ public class MRClientFactory {
.withResponse(withResponse).build();
} else {
pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
.onTopic(props.getProperty(TOPIC))
.batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
Integer.parseInt(props.getProperty("maxAgeMs").toString()))
.compress(Boolean.parseBoolean(props.getProperty("compress")))
.httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
}
- pub.setHost(props.getProperty("host"));
+ pub.setHost(props.getProperty(HOST));
if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
pub.setAuthKey(props.getProperty(AUTH_KEY));
@@ -638,10 +653,10 @@ public class MRClientFactory {
}
public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(consumerFilePath));
Properties props = new Properties();
- props.load(reader);
-
+ try(InputStream input = new FileInputStream(consumerFilePath)) {
+ props.load(input);
+ }
return createConsumer(props);
}
@@ -665,10 +680,10 @@ public class MRClientFactory {
MRConsumerImpl sub = null;
if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
sub = new MRConsumerImpl.MRConsumerImplBuilder()
- .setHostPart(MRConsumerImpl.stringToList(props.getProperty("host")))
+ .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
.setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
.setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
- .setFilter(props.getProperty("filter"))
+ .setFilter(props.getProperty(FILTER))
.setApiKey_username(props.getProperty(AUTH_KEY))
.setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
sub.setAuthKey(props.getProperty(AUTH_KEY));
@@ -677,10 +692,10 @@ public class MRClientFactory {
sub.setPassword(props.getProperty(PASSWORD));
} else {
sub = new MRConsumerImpl.MRConsumerImplBuilder()
- .setHostPart(MRConsumerImpl.stringToList(props.getProperty("host")))
+ .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
.setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
.setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
- .setFilter(props.getProperty("filter"))
+ .setFilter(props.getProperty(FILTER))
.setApiKey_username(props.getProperty(USERNAME))
.setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
sub.setUsername(props.getProperty(USERNAME));
@@ -688,9 +703,9 @@ public class MRClientFactory {
}
sub.setProps(props);
- sub.setHost(props.getProperty("host"));
+ sub.setHost(props.getProperty(HOST));
sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
- sub.setfFilter(props.getProperty("filter"));
+ sub.setfFilter(props.getProperty(FILTER));
if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java
index b29c100..07cf6a7 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java
@@ -48,10 +48,18 @@ import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
public class MRBaseClient extends HttpClient implements MRClient {
+ private final static String HEADER_TRANSACTION_ID = "transactionid";
+
+ private final static String JSON_RESULT = "result";
+ private final static String JSON_STATUS = "status";
+
+ private final static String AUTH_FAILED = "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.";
+ private final static String LOG_TRANSACTION_ID = "TransactionId : ";
+
private ClientConfig clientConfig = null;
protected MRBaseClient(Collection<String> hosts) throws MalformedURLException {
- super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort);
+ super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT);
fLog = LoggerFactory.getLogger(this.getClass().getName());
}
@@ -63,7 +71,7 @@ public class MRBaseClient extends HttpClient implements MRClient {
}
protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException {
- super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L,
+ super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT, clientSignature, CacheUse.NONE, 1, 1L,
TimeUnit.MILLISECONDS, 32, 32, 600000);
fLog = LoggerFactory.getLogger(this.getClass().getName());
@@ -79,6 +87,7 @@ public class MRBaseClient extends HttpClient implements MRClient {
@Override
public void close() {
+ // nothing to close
}
protected Set<String> jsonArrayToSet(JSONArray a) {
@@ -115,8 +124,7 @@ public class MRBaseClient extends HttpClient implements MRClient {
return getResponseDataInJson(response);
} else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ throw new HttpException(AUTH_FAILED);
}
}
@@ -149,8 +157,7 @@ public class MRBaseClient extends HttpClient implements MRClient {
responseData = (String) response.readEntity(String.class);
return responseData;
} else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ throw new HttpException(AUTH_FAILED);
}
}
@@ -180,8 +187,7 @@ public class MRBaseClient extends HttpClient implements MRClient {
postAuthDO.getAuthDate(), postAuthDO.getData(), postAuthDO.getContentType());
return getResponseDataInJson(response);
} else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ throw new HttpException(AUTH_FAILED);
}
}
@@ -198,8 +204,7 @@ public class MRBaseClient extends HttpClient implements MRClient {
return responseData;
} else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ throw new HttpException(AUTH_FAILED);
}
}
@@ -222,8 +227,7 @@ public class MRBaseClient extends HttpClient implements MRClient {
}
return getResponseDataInJson(response);
} else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ throw new HttpException(AUTH_FAILED);
}
}
@@ -241,18 +245,17 @@ public class MRBaseClient extends HttpClient implements MRClient {
String encoding = Base64.encodeAsString(username + ":" + password);
response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding);
}
- MRClientFactory.HTTPHeadersMap = response.getHeaders();
+ MRClientFactory.setHTTPHeadersMap(response.getHeaders());
- String transactionid = response.getHeaderString("transactionid");
+ String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
+ fLog.info(LOG_TRANSACTION_ID + transactionid);
}
responseData = (String) response.readEntity(String.class);
return responseData;
} else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ throw new HttpException(AUTH_FAILED);
}
}
@@ -266,8 +269,7 @@ public class MRBaseClient extends HttpClient implements MRClient {
return getResponseDataInJson(response);
} else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ throw new HttpException(AUTH_FAILED);
}
}
@@ -290,18 +292,17 @@ public class MRBaseClient extends HttpClient implements MRClient {
target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate);
- MRClientFactory.HTTPHeadersMap = response.getHeaders();
+ MRClientFactory.setHTTPHeadersMap(response.getHeaders());
- String transactionid = response.getHeaderString("transactionid");
+ String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
+ fLog.info(LOG_TRANSACTION_ID + transactionid);
}
responseData = (String) response.readEntity(String.class);
return responseData;
} else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ throw new HttpException(AUTH_FAILED);
}
}
@@ -313,11 +314,11 @@ public class MRBaseClient extends HttpClient implements MRClient {
target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
response = DmaapClientUtil.getResponsewtNoAuth(target);
- MRClientFactory.HTTPHeadersMap = response.getHeaders();
+ MRClientFactory.setHTTPHeadersMap(response.getHeaders());
- String transactionid = response.getHeaderString("transactionid");
+ String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
+ fLog.info(LOG_TRANSACTION_ID + transactionid);
}
responseData = (String) response.readEntity(String.class);
@@ -327,13 +328,13 @@ public class MRBaseClient extends HttpClient implements MRClient {
private JSONObject getResponseDataInJson(Response response) throws JSONException {
try {
- MRClientFactory.HTTPHeadersMap = response.getHeaders();
+ MRClientFactory.setHTTPHeadersMap(response.getHeaders());
// MultivaluedMap<String, Object> headersMap =
// for(String key : headersMap.keySet()) {
- String transactionid = response.getHeaderString("transactionid");
+ String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
+ fLog.info(LOG_TRANSACTION_ID + transactionid);
}
if (response.getStatus() == 403) {
@@ -341,8 +342,8 @@ public class MRBaseClient extends HttpClient implements MRClient {
jsonObject = new JSONObject();
JSONArray jsonArray = new JSONArray();
jsonArray.put(response.getEntity());
- jsonObject.put("result", jsonArray);
- jsonObject.put("status", response.getStatus());
+ jsonObject.put(JSON_RESULT, jsonArray);
+ jsonObject.put(JSON_STATUS, response.getStatus());
return jsonObject;
}
String responseData = (String) response.readEntity(String.class);
@@ -354,11 +355,11 @@ public class MRBaseClient extends HttpClient implements MRClient {
if ('[' == firstChar) {
JSONArray jsonArray = new JSONArray(jsonTokener);
jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
- jsonObject.put("status", response.getStatus());
+ jsonObject.put(JSON_RESULT, jsonArray);
+ jsonObject.put(JSON_STATUS, response.getStatus());
} else {
jsonObject = new JSONObject(jsonTokener);
- jsonObject.put("status", response.getStatus());
+ jsonObject.put(JSON_STATUS, response.getStatus());
}
return jsonObject;
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java
index bcd4403..5c7259c 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java
@@ -54,7 +54,7 @@ import org.onap.dmaap.mr.client.response.MRPublisherResponse;
@Deprecated
public class MRBatchPublisher implements MRBatchingPublisher
{
- public static final long kMinMaxAgeMs = 1;
+ public static final long MIN_MAX_AGE_MS = 1;
/**
* Create a batch publisher.
@@ -66,10 +66,10 @@ public class MRBatchPublisher implements MRBatchingPublisher
*/
public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
{
- if ( maxAgeMs < kMinMaxAgeMs )
+ if ( maxAgeMs < MIN_MAX_AGE_MS)
{
- fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs );
- maxAgeMs = kMinMaxAgeMs;
+ fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + MIN_MAX_AGE_MS);
+ maxAgeMs = MIN_MAX_AGE_MS;
}
try {
@@ -279,7 +279,7 @@ public class MRBatchPublisher implements MRBatchingPublisher
/**
* Called to queue a message.
- * @param m
+ * @param msgs
* @throws IOException
*/
public void queue ( Collection<message> msgs ) throws IOException
@@ -364,7 +364,7 @@ public class MRBatchPublisher implements MRBatchingPublisher
fLog.warn ( "Send failed, rebuilding send queue." );
// note the time for back-off
- fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis ();
+ fDontSendUntilMs = SF_WAIT_AFTER_ERROR + System.currentTimeMillis ();
// the send failed. reconstruct the pending queue
fWriteLock.lock ();
@@ -406,7 +406,7 @@ public class MRBatchPublisher implements MRBatchingPublisher
private final WriteLock fWriteLock;
private final ReadLock fReadLock;
private long fDontSendUntilMs;
- private static final long sfWaitAfterError = 1000;
+ private static final long SF_WAIT_AFTER_ERROR = 1000;
}
// this is static so that it's clearly not using any mutable member data outside of a lock
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java
index ed23918..6a13910 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java
@@ -38,21 +38,15 @@ public class MRClientVersionInfo
private static final Properties props = new Properties();
private static final String version;
- static
- {
+ static {
String use = null;
- try
- {
- final InputStream is = MRClientVersionInfo.class.getResourceAsStream ( "/MRClientVersion.properties" );
- if ( is != null )
- {
- props.load ( is );
+ try (InputStream is = MRClientVersionInfo.class.getResourceAsStream("/MRClientVersion.properties" )) {
+ if (is != null) {
+ props.load(is);
use = props.getProperty ( "MRClientVersion", null );
}
- }
- catch ( IOException e )
- {
- logger.error("exception: ", e);
+ } catch ( IOException e ) {
+ logger.error("exception: ", e);
}
version = use;
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java
index ec17610..dbf6b4d 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java
@@ -32,9 +32,9 @@ import org.apache.http.HttpHost;
class MRConstants
{
private static final String PROTOCOL = "http";
- public static final String context = "/";
- public static final String kBasePath = "events/";
- public static final int kStdMRServicePort = 8080;
+ public static final String CONTEXT = "/";
+ public static final String BASE_PATH = "events/";
+ public static final int STD_MR_SERVICE_PORT = 8080;
public static String escape ( String s )
{
@@ -53,8 +53,8 @@ class MRConstants
final String cleanTopic = escape ( rawTopic );
final StringBuffer url = new StringBuffer().
- append ( MRConstants.context ).
- append ( MRConstants.kBasePath ).
+ append ( MRConstants.CONTEXT).
+ append ( MRConstants.BASE_PATH).
append ( cleanTopic );
return url.toString ();
}
@@ -68,8 +68,8 @@ class MRConstants
url.append( PROTOCOL + "://" );
}
url.append(host);
- url.append ( MRConstants.context );
- url.append ( MRConstants.kBasePath );
+ url.append ( MRConstants.CONTEXT);
+ url.append ( MRConstants.BASE_PATH);
url.append ( cleanTopic );
return url.toString ();
}
@@ -86,8 +86,8 @@ class MRConstants
url.append( PROTOCOL + "://" );
}
url.append(host);
- url.append ( MRConstants.context );
- url.append ( MRConstants.kBasePath );
+ url.append ( MRConstants.CONTEXT);
+ url.append ( MRConstants.BASE_PATH);
url.append ( cleanTopic );
if(parttion!=null && !parttion.equalsIgnoreCase(""))
url.append("?partitionKey=").append(parttion);
@@ -97,7 +97,7 @@ class MRConstants
{
final String cleanConsumerGroup = escape ( rawConsumerGroup );
final String cleanConsumerId = escape ( rawConsumerId );
- return MRConstants.context + MRConstants.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId;
+ return MRConstants.CONTEXT + MRConstants.BASE_PATH + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId;
}
/**
@@ -122,7 +122,7 @@ class MRConstants
* Return an HttpHost from an input string. Input string has
* host[:port] as format. If the port section is not provided, the default port is used.
*
- * @param hosts
+ * @param host
* @return a list of hosts
*/
public static HttpHost hostForString ( String host )
@@ -130,7 +130,7 @@ class MRConstants
if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." );
String hostPart = host;
- int port = kStdMRServicePort;
+ int port = STD_MR_SERVICE_PORT;
final int colon = host.indexOf ( ':' );
if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." );
@@ -169,8 +169,8 @@ class MRConstants
}
url.append(host);
- url.append(context);
- url.append(kBasePath);
+ url.append(CONTEXT);
+ url.append(BASE_PATH);
url.append(fTopic + "/" + cleanConsumerGroup + "/" + cleanConsumerId);
return url.toString();
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
index 0b06f77..6c67313 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
@@ -28,19 +28,13 @@ import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+
+import java.io.*;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpException;
import org.apache.http.HttpStatus;
@@ -55,15 +49,27 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
private Logger log = LoggerFactory.getLogger(this.getClass().getName());
- public static final String routerFilePath = null;
+ public static final String ROUTER_FILE_PATH = null;
public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
public String consumerFilePath;
+ private static final String JSON_RESULT = "result";
+ private static final String PROPS_PROTOCOL = "Protocol";
+
+ private static final String EXECPTION_MESSAGE = "exception: ";
private static final String SUCCESS_MESSAGE = "Success";
private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
+ private static final String URL_PARAM_ROUTE_OFFER = "routeoffer";
+ private static final String URL_PARAM_PARTNER = "partner";
+ private static final String URL_PARAM_ENV_CONTEXT = "envContext";
+ private static final String URL_PARAM_VERSION = "version";
+ private static final String URL_PARAM_FILTER = "filter";
+ private static final String URL_PARAM_LIMIT = "limit";
+ private static final String URL_PARAM_TIMEOUT = "timeout";
+
private final String fTopic;
private final String fGroup;
private final String fId;
@@ -186,124 +192,67 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
final LinkedList<String> msgs = new LinkedList<>();
- try {
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
- dmeConfigure(timeoutMs, limit);
- try {
- String reply = sender.sendAndWait(timeoutMs + 10000L);
- final JSONObject o = getResponseDataInJson(reply);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
-
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- try {
- final JSONObject o = get(urlPath, username, password, protocolFlag);
-
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
- timeoutMs, limit);
-
- try {
- final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
+ ProtocolTypeConstants protocolFlagEnum = null;
+ for(ProtocolTypeConstants type : ProtocolTypeConstants.values()) {
+ if (type.getValue().equalsIgnoreCase(protocolFlag)) {
+ protocolFlagEnum = type;
}
+ }
+ if (protocolFlagEnum == null) {
+ return msgs;
+ }
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- try {
- final JSONObject o = getNoAuth(urlPath);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
+ try {
+ switch (protocolFlagEnum) {
+ case DME2:
+ dmeConfigure(timeoutMs, limit);
+ String reply = sender.sendAndWait(timeoutMs + 10000L);
+ readJsonData(msgs, getResponseDataInJson(reply));
+ break;
+ case AAF_AUTH:
+ String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
+ final JSONObject o = get(urlAuthPath, username, password, protocolFlag);
+ readJsonData(msgs, o);
+ break;
+ case AUTH_KEY:
+ final String urlKeyPath = createUrlPath(
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
+ timeoutMs, limit);
+ final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag);
+ readJsonData(msgs, authObject);
+ break;
+ case HTTPNOAUTH:
+ final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
+ readJsonData(msgs, getNoAuth(urlNoAuthPath));
+ break;
}
} catch (JSONException e) {
// unexpected response
reportProblemWithResponse();
- log.error("exception: ", e);
+ log.error(EXECPTION_MESSAGE, e);
} catch (HttpException e) {
throw new IOException(e);
- } catch (Exception e) {
- throw e;
}
return msgs;
}
+ private void readJsonData(LinkedList<String> msgs, JSONObject o) {
+ if (o != null) {
+ final JSONArray a = o.getJSONArray(JSON_RESULT);
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ }
+
@Override
public MRConsumerResponse fetchWithReturnConsumerResponse() {
// fetch with the timeout and limit set in constructor
@@ -324,82 +273,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
+ readJsonData(msgs, o);
createMRConsumerResponse(reply, mrConsumerResponse);
}
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+ fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
String response = getResponse(urlPath, username, password, protocolFlag);
final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
+ readJsonData(msgs, o);
createMRConsumerResponse(response, mrConsumerResponse);
}
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
timeoutMs, limit);
String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
+ readJsonData(msgs, o);
createMRConsumerResponse(response, mrConsumerResponse);
}
if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+ fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
+ readJsonData(msgs, o);
createMRConsumerResponse(response, mrConsumerResponse);
}
@@ -418,7 +323,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
} catch (Exception e) {
mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("exception: ", e);
+ log.error(EXECPTION_MESSAGE, e);
}
mrConsumerResponse.setActualMessages(msgs);
return mrConsumerResponse;
@@ -460,7 +365,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
if ('[' == firstChar) {
JSONArray jsonArray = new JSONArray(jsonTokener);
jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
+ jsonObject.put(JSON_RESULT, jsonArray);
} else {
jsonObject = new JSONObject(jsonTokener);
}
@@ -484,7 +389,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
if ('[' == firstChar) {
JSONArray jsonArray = new JSONArray(jsonTokener);
jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
+ jsonObject.put(JSON_RESULT, jsonArray);
} else if ('{' == firstChar) {
return null;
} else if ('<' == firstChar) {
@@ -506,7 +411,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
String partner = props.getProperty("Partner");
String routeOffer = props.getProperty("routeOffer");
String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
- String protocol = props.getProperty("Protocol");
+ String protocol = props.getProperty(PROPS_PROTOCOL);
String methodType = props.getProperty("MethodType");
String dmeuser = props.getProperty("username");
String dmepassword = props.getProperty("password");
@@ -519,25 +424,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
*/
String preferredRouteKey = readRoute("preferredRouteKey");
-
+ StringBuilder contextUrl = new StringBuilder();
if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
- + "&routeoffer=" + preferredRouteKey;
+ contextUrl.append(protocol).append("://").append(serviceName).append("?")
+ .append(URL_PARAM_VERSION).append("=").append(version).append("&")
+ .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
+ .append(URL_PARAM_PARTNER).append("=").append(partner).append("&")
+ .append(URL_PARAM_ROUTE_OFFER).append("=").append(preferredRouteKey);
} else if (partner != null && !partner.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
+ contextUrl.append(protocol).append("://").append(serviceName).append("?")
+ .append(URL_PARAM_VERSION).append("=").append(version).append("&")
+ .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
+ .append(URL_PARAM_PARTNER).append("=").append(partner);
} else if (routeOffer != null && !routeOffer.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
- + routeOffer;
+ contextUrl.append(protocol).append("://").append(serviceName).append("?")
+ .append(URL_PARAM_VERSION).append("=").append(version).append("&")
+ .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
+ .append(URL_PARAM_ROUTE_OFFER).append("=").append(routeOffer);
}
- if (timeoutMs != -1)
- url = url + "&timeout=" + timeoutMs;
- if (limit != -1)
- url = url + "&limit=" + limit;
+ if (timeoutMs != -1) {
+ contextUrl.append("&").append(URL_PARAM_TIMEOUT).append("=").append(timeoutMs);
+ }
+ if (limit != -1) {
+ contextUrl.append("&").append(URL_PARAM_LIMIT).append("=").append(limit);
+ }
// Add filter to DME2 Url
- if (fFilter != null && fFilter.length() > 0)
- url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
+ if (fFilter != null && fFilter.length() > 0) {
+ contextUrl.append("&").append(URL_PARAM_FILTER).append("=").append(URLEncoder.encode(fFilter, "UTF-8"));
+ }
+
+ url = contextUrl.toString();
DMETimeOuts = new HashMap<>();
DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
@@ -642,8 +560,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
}
private String readRoute(String routeKey) {
- try {
- MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
+ try(InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) {
+ MRClientFactory.prop.load(input);
} catch (Exception ex) {
log.error("Reply Router Error " + ex);
}
@@ -665,7 +583,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
}
public static String getRouterFilePath() {
- return routerFilePath;
+ return ROUTER_FILE_PATH;
}
public static void setRouterFilePath(String routerFilePath) {
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java
index 04b26d5..c1e2d12 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java
@@ -42,6 +42,12 @@ import org.onap.dmaap.mr.client.MRTopicManager;
public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager
{
+ private static final String BASE_URI_TOPIC = "/topics";
+ private static final String BASE_URI_APIKEY = "/apiKeys";
+
+ private static final String PARAM_DESCRIPTION = "description";
+ private static final String PARAM_EMAIL = "email";
+
private static final Logger logger = LoggerFactory.getLogger(MRMetaClient.class);
public MRMetaClient ( Collection<String> baseUrls ) throws MalformedURLException
{
@@ -54,7 +60,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
final TreeSet<String> set = new TreeSet<> ();
try
{
- final JSONObject topicSet = get ( "/topics" );
+ final JSONObject topicSet = get ( BASE_URI_TOPIC );
final JSONArray a = topicSet.getJSONArray ( "topics" );
for ( int i=0; i<a.length (); i++ )
{
@@ -83,7 +89,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
{
try
{
- final JSONObject topicData = get ( "/topics/" + MRConstants.escape ( topic ) );
+ final JSONObject topicData = get ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) );
return new TopicInfo ()
{
@Override
@@ -95,7 +101,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
@Override
public String getDescription ()
{
- return topicData.optString ( "description", null );
+ return topicData.optString ( PARAM_DESCRIPTION, null );
}
@Override
@@ -139,13 +145,13 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
o.put ( "topicDescription", topicDescription );
o.put ( "partitionCount", partitionCount );
o.put ( "replicationCount", replicationCount );
- post ( "/topics/create", o, false );
+ post ( BASE_URI_TOPIC + "/create", o, false );
}
@Override
public void deleteTopic ( String topic ) throws HttpException, IOException
{
- delete ( "/topics/" + MRConstants.escape ( topic ) );
+ delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) );
}
@Override
@@ -163,13 +169,13 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
@Override
public void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
{
- put ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() );
+ put ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() );
}
@Override
public void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException
{
- delete ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) );
+ delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) );
}
@Override
@@ -187,13 +193,13 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
@Override
public void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
{
- put ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() );
+ put ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() );
}
@Override
public void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException
{
- delete ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) );
+ delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) );
}
@Override
@@ -202,9 +208,9 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
try
{
final JSONObject o = new JSONObject ();
- o.put ( "email", email );
- o.put ( "description", description );
- final JSONObject reply = post ( "/apiKeys/create", o, true );
+ o.put ( PARAM_EMAIL, email );
+ o.put ( PARAM_DESCRIPTION, description );
+ final JSONObject reply = post ( BASE_URI_APIKEY + "/create", o, true );
return new ApiCredential ( reply.getString ( "key" ), reply.getString ( "secret" ) );
}
catch ( JSONException e )
@@ -217,7 +223,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
@Override
public ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
{
- final JSONObject keyEntry = get ( "/apiKeys/" + MRConstants.escape ( apiKey ) );
+ final JSONObject keyEntry = get ( BASE_URI_APIKEY + "/" + MRConstants.escape ( apiKey ) );
if ( keyEntry == null )
{
return null;
@@ -231,7 +237,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
final JSONObject aux = keyEntry.optJSONObject ( "aux" );
if ( aux != null )
{
- return aux.optString ( "email" );
+ return aux.optString ( PARAM_EMAIL );
}
return null;
}
@@ -242,7 +248,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
final JSONObject aux = keyEntry.optJSONObject ( "aux" );
if ( aux != null )
{
- return aux.optString ( "description" );
+ return aux.optString ( PARAM_DESCRIPTION );
}
return null;
}
@@ -253,14 +259,14 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden
public void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException
{
final JSONObject o = new JSONObject ();
- if ( email != null ) o.put ( "email", email );
- if ( description != null ) o.put ( "description", description );
- patch ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ), o );
+ if ( email != null ) o.put ( PARAM_EMAIL, email );
+ if ( description != null ) o.put ( PARAM_DESCRIPTION, description );
+ patch ( BASE_URI_APIKEY + "/" + MRConstants.escape ( getCurrentApiKey() ), o );
}
@Override
public void deleteCurrentApiKey () throws HttpException, IOException
{
- delete ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ) );
+ delete ( BASE_URI_APIKEY + "/" + MRConstants.escape ( getCurrentApiKey() ) );
}
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
index 9b969a6..bd140cd 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
@@ -59,9 +59,18 @@ import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
+ private static final String PROPS_PROTOCOL = "Protocol";
+ private static final String PROPS_PARTITION = "partition";
+ private static final String PROPS_CONTENT_TYPE = "contenttype";
+
+ private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip";
+ private static final String CONTENT_TYPE_CAMBRIA = "application/cambria";
+ private static final String CONTENT_TYPE_JSON = "application/json";
+ private static final String CONTENT_TYPE_TEXT = "text/plain";
+
+ private static final String JSON_STATUS = "status";
+
public static class Builder {
- public Builder() {
- }
public Builder againstUrls(Collection<String> baseUrls) {
fUrls = baseUrls;
@@ -278,6 +287,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
return jsonArray;
}
+ private void logTime(long startMs, String dmeResponse) {
+ if (getLog().isInfoEnabled()) {
+ getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse);
+ }
+ }
+
private synchronized boolean sendBatch() {
// it's possible for this call to be made with an empty list. in this
// case, just return.
@@ -291,28 +306,28 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
host = this.fHostSelector.selectBaseHost();
}
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
- props.getProperty("partition"));
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
+ props.getProperty(PROPS_PARTITION));
try {
final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
OutputStream os = baseStream;
- final String contentType = props.getProperty("contenttype");
- if (contentType.equalsIgnoreCase("application/json")) {
+ final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
JSONArray jsonArray = parseJSON();
os.write(jsonArray.toString().getBytes());
os.close();
- } else if (contentType.equalsIgnoreCase("text/plain")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
for (TimestampedMessage m : fPending) {
os.write(m.fMsg.getBytes());
os.write('\n');
}
os.close();
- } else if (contentType.equalsIgnoreCase("application/cambria")
- || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
- if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
+ || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
os = new GZIPOutputStream(baseStream);
}
for (TimestampedMessage m : fPending) {
@@ -339,15 +354,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
DME2Configue();
- Thread.sleep(5);
+ this.wait(5);
getLog().info(String
.format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
nowMs - fPending.peek().timestamp));
sender.setPayload(os.toString());
String dmeResponse = sender.sendAndWait(5000L);
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
- getLog().info(logLine);
+ logTime(startMs, dmeResponse);
fPending.clear();
return true;
}
@@ -362,11 +376,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
// Here we are checking for error response. If HTTP status
// code is not within the http success response code
// then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
@@ -380,11 +393,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
// Here we are checking for error response. If HTTP status
// code is not within the http success response code
// then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
@@ -397,11 +409,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
// Here we are checking for error response. If HTTP status
// code is not within the http success response code
// then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
@@ -424,25 +435,25 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
host = this.fHostSelector.selectBaseHost();
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
- props.getProperty("partition"));
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
+ props.getProperty(PROPS_PARTITION));
OutputStream os = null;
try {
final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
os = baseStream;
- final String contentType = props.getProperty("contenttype");
- if (contentType.equalsIgnoreCase("application/json")) {
+ final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
JSONArray jsonArray = parseJSON();
os.write(jsonArray.toString().getBytes());
- } else if (contentType.equalsIgnoreCase("text/plain")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
for (TimestampedMessage m : fPending) {
os.write(m.fMsg.getBytes());
os.write('\n');
}
- } else if (contentType.equalsIgnoreCase("application/cambria")
- || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
- if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
+ || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
os = new GZIPOutputStream(baseStream);
}
for (TimestampedMessage m : fPending) {
@@ -469,7 +480,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
try {
DME2Configue();
- Thread.sleep(5);
+ this.wait(5);
getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
nowMs - fPending.peek().timestamp);
sender.setPayload(os.toString());
@@ -524,8 +535,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
return pubResponse;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result);
fPending.clear();
return pubResponse;
}
@@ -618,12 +628,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
mrPubResponse.setResponseMessage("Please verify the Producer properties");
} else if (reply.startsWith("{")) {
JSONObject jObject = new JSONObject(reply);
- if (jObject.has("message") && jObject.has("status")) {
+ if (jObject.has("message") && jObject.has(JSON_STATUS)) {
String message = jObject.getString("message");
if (null != message) {
mrPubResponse.setResponseMessage(message);
}
- mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+ mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
} else {
mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
mrPubResponse.setResponseMessage(reply);
@@ -730,11 +740,11 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
routeOffer = props.getProperty("routeOffer");
subContextPath = props.getProperty("SubContextPath") + fTopic;
- protocol = props.getProperty("Protocol");
+ protocol = props.getProperty(PROPS_PROTOCOL);
methodType = props.getProperty("MethodType");
dmeuser = props.getProperty("username");
dmepassword = props.getProperty("password");
- contentType = props.getProperty("contenttype");
+ contentType = props.getProperty(PROPS_CONTENT_TYPE);
handlers = props.getProperty("sessionstickinessrequired");
routerFilePath = props.getProperty("DME2preferredRouterFilePath");
@@ -744,7 +754,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
* routeOffer value for auto failover within a cluster
*/
- String partitionKey = props.getProperty("partition");
+ String partitionKey = props.getProperty(PROPS_PARTITION);
if (partner != null && !partner.isEmpty()) {
url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
diff --git a/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExampleConsumer.java b/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExampleConsumer.java
index 5bb7dad..76abb7e 100644
--- a/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExampleConsumer.java
+++ b/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExampleConsumer.java
@@ -57,7 +57,7 @@ public class SimpleExampleConsumer {
logger.debug("Message Received: " + msg);
}
// Header for DME2 Call.
- MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap;
+ MultivaluedMap<String, Object> headersMap = MRClientFactory.getHTTPHeadersMap();
for (MultivaluedMap.Entry<String, List<Object>> entry : headersMap.entrySet()) {
key = entry.getKey();
logger.debug("Header Key " + key);
diff --git a/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExamplePublisher.java b/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExamplePublisher.java
index de0d461..b5d83bb 100644
--- a/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExamplePublisher.java
+++ b/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExamplePublisher.java
@@ -93,7 +93,7 @@ public class SimpleExamplePublisher {
}*/
if (transport.equalsIgnoreCase("HTTP")) {
- MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap;
+ MultivaluedMap<String, Object> headersMap = MRClientFactory.getHTTPHeadersMap();
for (String key : headersMap.keySet()) {
System.out.println("Header Key " + key);
System.out.println("Header Value " + headersMap.get(key));
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
index 2e514b0..f7341ec 100644
--- a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
+++ b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
@@ -45,6 +45,9 @@ public class SimpleExampleConsumerWithReturnResponse {
long count = 0;
long nextReport = 5000;
+ // remove while true and limite execution time in seconds
+ int timeMax = 86400; // one day
+ long endDate = System.currentTimeMillis() + timeMax*1000;
final long startMs = System.currentTimeMillis ();
@@ -60,7 +63,7 @@ public class SimpleExampleConsumerWithReturnResponse {
routeReader= new FileReader(new File (routeFilePath));
props= new Properties();
final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" );
- while ( true )
+ while ( System.currentTimeMillis() < endDate )
{
MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse();
System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode());