aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml4
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRClientBuilders.java746
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRClientFactory.java1243
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java19
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java2
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java1
-rw-r--r--src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java16
-rw-r--r--src/test/java/com/att/nsa/mr/client/MRClientBuildersTest.java272
-rw-r--r--src/test/java/com/att/nsa/mr/client/MRClientFactoryTest.java279
-rw-r--r--src/test/java/com/att/nsa/mr/client/impl/MRBaseClientTest.java12
-rw-r--r--version.properties2
11 files changed, 1329 insertions, 1267 deletions
diff --git a/pom.xml b/pom.xml
index 896ba04..1da2616 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
<groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
<artifactId>dmaapClient</artifactId>
<packaging>jar</packaging>
- <version>1.1.7-SNAPSHOT</version>
+ <version>1.1.8-SNAPSHOT</version>
<name>dmaap-messagerouter-dmaapclient</name>
<description>Client library for MR event routing API</description>
<url>https://github.com/att/dmaap-framework</url>
@@ -29,7 +29,7 @@
<!-- for the client library, we want to allow 1.6 or later -->
<maven.compiler.target>1.7</maven.compiler.target>
<maven.compiler.source>1.7</maven.compiler.source>
- <jersey.version>2.26</jersey.version>
+ <jersey.version>2.22.1</jersey.version>
<version.jackson.core>2.6.7.1</version.jackson.core>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java b/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java
index 76df039..73ef5c4 100644
--- a/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java
+++ b/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java
@@ -4,6 +4,8 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -40,344 +42,408 @@ import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
*/
public class MRClientBuilders
{
- /**
- * A builder for a topic Consumer
- * @author author
- */
- public static class ConsumerBuilder
- {
- /**
- * Construct a consumer builder.
- */
- public ConsumerBuilder () {}
-
- /**
- * Set the host list
- * @param hostList a comma-separated list of hosts to use to connect to MR
- * @return this builder
- */
- public ConsumerBuilder usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); }
-
- /**
- * Set the host list
- * @param hostSet a set of hosts to use to connect to MR
- * @return this builder
- */
- public ConsumerBuilder usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; }
-
- /**
- * Set the topic
- * @param topic the name of the topic to consume
- * @return this builder
- */
- public ConsumerBuilder onTopic ( String topic ) { fTopic=topic; return this; }
-
- /**
- * Set the consumer's group and ID
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consumer in its group
- * @return this builder
- */
- public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { fGroup = consumerGroup; fId = consumerId; return this; }
-
- /**
- * Set the API key and secret for this client.
- * @param apiKey
- * @param apiSecret
- * @return this builder
- */
- public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
-
- /**
- * Set the server side timeout
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
- * @return this builder
- */
- public ConsumerBuilder waitAtServer ( int timeoutMs ) { fTimeoutMs = timeoutMs; return this; };
-
- /**
- * Set the maximum number of messages to receive per transaction
- * @param limit The maximum number of messages to receive from the server in one transaction.
- * @return this builder
- */
- public ConsumerBuilder receivingAtMost ( int limit ) { fLimit = limit; return this; };
-
- /**
- * Set a filter to use on the server
- * @param filter a Highland Park standard library filter encoded in JSON
- * @return this builder
- */
- public ConsumerBuilder withServerSideFilter ( String filter ) { fFilter = filter; return this; }
-
- /**
- * Build the consumer
- * @return a consumer
- */
- public MRConsumer build ()
- {
- if ( fHosts == null || fHosts.size() == 0 || fTopic == null )
- {
- throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
- }
-
- if ( fGroup == null )
- {
- fGroup = UUID.randomUUID ().toString ();
- fId = "0";
- log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
- }
-
- if ( sfConsumerMock != null ) return sfConsumerMock;
- try {
- return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- private Collection<String> fHosts = null;
- private String fTopic = null;
- private String fGroup = null;
- private String fId = null;
- private String fApiKey = null;
- private String fApiSecret = null;
- private int fTimeoutMs = -1;
- private int fLimit = -1;
- private String fFilter = null;
- }
-
- /*************************************************************************/
- /*************************************************************************/
- /*************************************************************************/
-
- /**
- * A publisher builder
- * @author author
- */
- public static class PublisherBuilder
- {
- public PublisherBuilder () {}
-
- /**
- * Set the MR/UEB host(s) to use
- * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
- * @return this builder
- */
- public PublisherBuilder usingHosts ( String hostlist ) { return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); }
-
- /**
- * Set the MR/UEB host(s) to use
- * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
- * @return this builder
- */
- public PublisherBuilder usingHosts ( String[] hostSet )
- {
- final TreeSet<String> hosts = new TreeSet<String> ();
- for ( String hp : hostSet )
- {
- hosts.add ( hp );
- }
- return usingHosts ( hosts );
- }
-
- /**
- * Set the MR/UEB host(s) to use
- * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
- * @return this builder
- */
- public PublisherBuilder usingHosts ( Collection<String> hostlist ) { fHosts=hostlist; return this; }
-
- /**
- * Set the topic to publish on
- * @param topic The topic on which to publish messages.
- * @return this builder
- */
- public PublisherBuilder onTopic ( String topic ) { fTopic = topic; return this; }
-
- /**
- * Batch message sends with the given limits.
- * @param messageCount The largest set of messages to batch.
- * @param ageInMs The maximum age of a message waiting in a batch.
- * @return this builder
- */
- public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { fMaxBatchSize = messageCount; fMaxBatchAgeMs = ageInMs; return this; }
-
- /**
- * Compress transactions
- * @return this builder
- */
- public PublisherBuilder withCompresion () { return enableCompresion(true); }
-
- /**
- * Do not compress transactions
- * @return this builder
- */
- public PublisherBuilder withoutCompresion () { return enableCompresion(false); }
-
- /**
- * Set the compression option
- * @param compress true to gzip compress transactions
- * @return this builder
- */
- public PublisherBuilder enableCompresion ( boolean compress ) { fCompress = compress; return this; }
-
- /**
- * Set the API key and secret for this client.
- * @param apiKey
- * @param apiSecret
- * @return this builder
- */
- public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
-
- /**
- * Build the publisher
- * @return a batching publisher
- */
- public MRBatchingPublisher build ()
- {
- if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
- {
- throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
- }
-
- if ( sfPublisherMock != null ) return sfPublisherMock;
-
- final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
- againstUrls ( fHosts ).
- onTopic ( fTopic ).
- batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
- compress ( fCompress ).
- build ();
- if ( fApiKey != null )
- {
- pub.setApiCredentials ( fApiKey, fApiSecret );
- }
- return pub;
- }
-
- private Collection<String> fHosts = null;
- private String fTopic = null;
- private int fMaxBatchSize = 1;
- private int fMaxBatchAgeMs = 1;
- private boolean fCompress = false;
- private String fApiKey = null;
- private String fApiSecret = null;
- }
-
- /**
- * A builder for an identity manager
- * @author author
- */
- public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
- {
- /**
- * Construct an identity manager builder.
- */
- public IdentityManagerBuilder () {}
-
- @Override
- protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
- return new MRMetaClient ( hosts );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- } }
- }
-
- /**
- * A builder for a topic manager
- * @author author
- */
- public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
- {
- /**
- * Construct an topic manager builder.
- */
- public TopicManagerBuilder () {}
-
- @Override
- protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
- return new MRMetaClient ( hosts );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- } }
- }
-
- /**
- * Inject a consumer. Used to support unit tests.
- * @param cc
- */
- public static void $testInject ( MRConsumer cc )
- {
- sfConsumerMock = cc;
- }
-
- /**
- * Inject a publisher. Used to support unit tests.
- * @param pub
- */
- public static void $testInject ( MRBatchingPublisher pub )
- {
- sfPublisherMock = pub;
- }
-
- static MRConsumer sfConsumerMock = null;
- static MRBatchingPublisher sfPublisherMock = null;
-
- /**
- * A builder for an identity manager
- * @author author
- */
- public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
- {
- /**
- * Construct an identity manager builder.
- */
- public AbstractAuthenticatedManagerBuilder () {}
-
- /**
- * Set the host list
- * @param hostList a comma-separated list of hosts to use to connect to MR
- * @return this builder
- */
- public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); }
-
- /**
- * Set the host list
- * @param hostSet a set of hosts to use to connect to MR
- * @return this builder
- */
- public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; }
-
- /**
- * Set the API key and secret for this client.
- * @param apiKey
- * @param apiSecret
- * @return this builder
- */
- public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
-
- /**
- * Build the consumer
- * @return a consumer
- */
- public T build ()
- {
- if ( fHosts.isEmpty() )
- {
- throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
- }
-
- final T mgr = constructClient ( fHosts );
- mgr.setApiCredentials ( fApiKey, fApiSecret );
- return mgr;
- }
-
- protected abstract T constructClient ( Collection<String> hosts );
-
- private Collection<String> fHosts = null;
- private String fApiKey = null;
- private String fApiSecret = null;
- }
-
- private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );
+
+ /**
+ * Instantiates MRClientBuilders.
+ */
+ private MRClientBuilders() {
+ // prevent instantiation
+ }
+
+ /**
+ * A builder for a topic Consumer
+ * @author author
+ */
+ public static class ConsumerBuilder
+ {
+ /**
+ * Construct a consumer builder.
+ */
+ public ConsumerBuilder () {}
+
+ /**
+ * Set the host list
+ * @param hostList a comma-separated list of hosts to use to connect to MR
+ * @return this builder
+ */
+ public ConsumerBuilder usingHosts ( String hostList ) {
+ return usingHosts ( MRConsumerImpl.stringToList(hostList) );
+ }
+
+ /**
+ * Set the host list
+ * @param hostSet a set of hosts to use to connect to MR
+ * @return this builder
+ */
+ public ConsumerBuilder usingHosts ( Collection<String> hostSet ) {
+ fHosts = hostSet; return this;
+ }
+
+ /**
+ * Set the topic
+ * @param topic the name of the topic to consume
+ * @return this builder
+ */
+ public ConsumerBuilder onTopic ( String topic ) {
+ fTopic=topic;
+ return this;
+ }
+
+ /**
+ * Set the consumer's group and ID
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consumer in its group
+ * @return this builder
+ */
+ public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) {
+ fGroup = consumerGroup;
+ fId = consumerId;
+ return this;
+ }
+
+ /**
+ * Set the API key and secret for this client.
+ * @param apiKey
+ * @param apiSecret
+ * @return this builder
+ */
+ public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) {
+ fApiKey = apiKey;
+ fApiSecret = apiSecret;
+ return this;
+ }
+
+ /**
+ * Set the server side timeout
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
+ * @return this builder
+ */
+ public ConsumerBuilder waitAtServer ( int timeoutMs ) {
+ fTimeoutMs = timeoutMs;
+ return this;
+ };
+
+ /**
+ * Set the maximum number of messages to receive per transaction
+ * @param limit The maximum number of messages to receive from the server in one transaction.
+ * @return this builder
+ */
+ public ConsumerBuilder receivingAtMost ( int limit ) {
+ fLimit = limit;
+ return this;
+ };
+
+ /**
+ * Set a filter to use on the server
+ * @param filter a Highland Park standard library filter encoded in JSON
+ * @return this builder
+ */
+ public ConsumerBuilder withServerSideFilter ( String filter ) {
+ fFilter = filter;
+ return this;
+ }
+
+ /**
+ * Build the consumer
+ * @return a consumer
+ */
+ public MRConsumer build ()
+ {
+ if ( fHosts == null || fHosts.size() == 0 || fTopic == null )
+ {
+ throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ }
+
+ if ( fGroup == null )
+ {
+ fGroup = UUID.randomUUID ().toString ();
+ fId = "0";
+ log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
+ }
+
+ if ( sfConsumerMock != null ) return sfConsumerMock;
+ try {
+ return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret );
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private Collection<String> fHosts = null;
+ private String fTopic = null;
+ private String fGroup = null;
+ private String fId = null;
+ private String fApiKey = null;
+ private String fApiSecret = null;
+ private int fTimeoutMs = -1;
+ private int fLimit = -1;
+ private String fFilter = null;
+ }
+
+ /*************************************************************************/
+ /*************************************************************************/
+ /*************************************************************************/
+
+ /**
+ * A publisher builder
+ * @author author
+ */
+ public static class PublisherBuilder
+ {
+ public PublisherBuilder () {}
+
+ /**
+ * Set the MR/UEB host(s) to use
+ * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
+ * @return this builder
+ */
+ public PublisherBuilder usingHosts ( String hostlist ) {
+ return usingHosts ( MRConsumerImpl.stringToList(hostlist) );
+ }
+
+ /**
+ * Set the MR/UEB host(s) to use
+ * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
+ * @return this builder
+ */
+ public PublisherBuilder usingHosts ( String[] hostSet )
+ {
+ final TreeSet<String> hosts = new TreeSet<String> ();
+ for ( String hp : hostSet )
+ {
+ hosts.add ( hp );
+ }
+ return usingHosts ( hosts );
+ }
+
+ /**
+ * Set the MR/UEB host(s) to use
+ * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
+ * @return this builder
+ */
+ public PublisherBuilder usingHosts ( Collection<String> hostlist ) {
+ fHosts=hostlist;
+ return this;
+ }
+
+ /**
+ * Set the topic to publish on
+ * @param topic The topic on which to publish messages.
+ * @return this builder
+ */
+ public PublisherBuilder onTopic ( String topic ) {
+ fTopic = topic;
+ return this;
+ }
+
+ /**
+ * Batch message sends with the given limits.
+ * @param messageCount The largest set of messages to batch.
+ * @param ageInMs The maximum age of a message waiting in a batch.
+ * @return this builder
+ */
+ public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) {
+ fMaxBatchSize = messageCount;
+ fMaxBatchAgeMs = ageInMs;
+ return this;
+ }
+
+ /**
+ * Compress transactions
+ * @return this builder
+ */
+ public PublisherBuilder withCompresion () {
+ return enableCompresion(true);
+ }
+
+ /**
+ * Do not compress transactions
+ * @return this builder
+ */
+ public PublisherBuilder withoutCompresion () {
+ return enableCompresion(false);
+ }
+
+ /**
+ * Set the compression option
+ * @param compress true to gzip compress transactions
+ * @return this builder
+ */
+ public PublisherBuilder enableCompresion ( boolean compress ) {
+ fCompress = compress;
+ return this;
+ }
+
+ /**
+ * Set the API key and secret for this client.
+ * @param apiKey
+ * @param apiSecret
+ * @return this builder
+ */
+ public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) {
+ fApiKey = apiKey;
+ fApiSecret = apiSecret;
+ return this;
+ }
+
+ /**
+ * Build the publisher
+ * @return a batching publisher
+ */
+ public MRBatchingPublisher build ()
+ {
+ if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
+ {
+ throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ }
+
+ if ( sfPublisherMock != null ) return sfPublisherMock;
+
+ final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
+ againstUrls ( fHosts ).
+ onTopic ( fTopic ).
+ batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
+ compress ( fCompress ).
+ build ();
+ if ( fApiKey != null )
+ {
+ pub.setApiCredentials ( fApiKey, fApiSecret );
+ }
+ return pub;
+ }
+
+ private Collection<String> fHosts = null;
+ private String fTopic = null;
+ private int fMaxBatchSize = 1;
+ private int fMaxBatchAgeMs = 1;
+ private boolean fCompress = false;
+ private String fApiKey = null;
+ private String fApiSecret = null;
+ }
+
+ /**
+ * A builder for an identity manager
+ * @author author
+ */
+ public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
+ {
+ /**
+ * Construct an identity manager builder.
+ */
+ public IdentityManagerBuilder () {}
+
+ @Override
+ protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
+ return new MRMetaClient ( hosts );
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ } }
+ }
+
+ /**
+ * A builder for a topic manager
+ * @author author
+ */
+ public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
+ {
+ /**
+ * Construct an topic manager builder.
+ */
+ public TopicManagerBuilder () {}
+
+ @Override
+ protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
+ return new MRMetaClient ( hosts );
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ } }
+ }
+
+ /**
+ * Inject a consumer. Used to support unit tests.
+ * @param cc
+ */
+ public static void $testInject ( MRConsumer cc )
+ {
+ sfConsumerMock = cc;
+ }
+
+ /**
+ * Inject a publisher. Used to support unit tests.
+ * @param pub
+ */
+ public static void $testInject ( MRBatchingPublisher pub )
+ {
+ sfPublisherMock = pub;
+ }
+
+ static MRConsumer sfConsumerMock = null;
+ static MRBatchingPublisher sfPublisherMock = null;
+
+ /**
+ * A builder for an identity manager
+ * @author author
+ */
+ public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
+ {
+ /**
+ * Construct an identity manager builder.
+ */
+ public AbstractAuthenticatedManagerBuilder () {}
+
+ /**
+ * Set the host list
+ * @param hostList a comma-separated list of hosts to use to connect to MR
+ * @return this builder
+ */
+ public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) {
+ return usingHosts ( MRConsumerImpl.stringToList(hostList) );
+ }
+
+ /**
+ * Set the host list
+ * @param hostSet a set of hosts to use to connect to MR
+ * @return this builder
+ */
+ public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) {
+ fHosts = hostSet;
+ return this;
+ }
+
+ /**
+ * Set the API key and secret for this client.
+ * @param apiKey
+ * @param apiSecret
+ * @return this builder
+ */
+ public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) {
+ fApiKey = apiKey;
+ fApiSecret = apiSecret;
+ return this;
+ }
+
+ /**
+ * Build the consumer
+ * @return a consumer
+ */
+ public T build ()
+ {
+ if ( fHosts.isEmpty() )
+ {
+ throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ }
+
+ final T mgr = constructClient ( fHosts );
+ mgr.setApiCredentials ( fApiKey, fApiSecret );
+ return mgr;
+ }
+
+ protected abstract T constructClient ( Collection<String> hosts );
+
+ private Collection<String> fHosts = null;
+ private String fApiKey = null;
+ private String fApiSecret = null;
+ }
+
+ private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );
}
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
index 97aa0b8..689190e 100644
--- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
+++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
@@ -4,6 +4,8 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -58,617 +60,632 @@ import com.att.nsa.mr.tools.ValidatorUtil;
* @author author
*/
public class MRClientFactory {
- public static MultivaluedMap<String, Object> HTTPHeadersMap;
- public static Map<String, String> DME2HeadersMap;
- public static String routeFilePath;
-
- public static FileReader routeReader;
-
- public static FileWriter routeWriter = null;
- public static Properties prop = null;
-
-
- // props= new Properties();
- /**
- * Create a consumer instance with the default timeout and no limit on
- * messages returned. This consumer operates as an independent consumer
- * (i.e., not in a group) and is NOT re-startable across sessions.
- *
- * @param hostList
- * A comma separated list of hosts to use to connect to MR. You
- * can include port numbers (3904 is the default). For example,
- * "hostname:8080,"
- *
- * @param topic
- * The topic to consume
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(String hostList, String topic) {
- return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
- }
-
- /**
- * Create a consumer instance with the default timeout and no limit on
- * messages returned. This consumer operates as an independent consumer
- * (i.e., not in a group) and is NOT re-startable across sessions.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
- return createConsumer(hostSet, topic, null);
- }
-
- /**
- * Create a consumer instance with server-side filtering, the default
- * timeout, and no limit on messages returned. This consumer operates as an
- * independent consumer (i.e., not in a group) and is NOT re-startable
- * across sessions.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param filter
- * a filter to use on the server side
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
- return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId) {
- return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit) {
- return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart. This consumer also uses server-side filtering.
- *
- * @param hostList
- * A comma separated list of hosts to use to connect to MR. You
- * can include port numbers (3904 is the default). For example,
- * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- * @param filter
- * A Highland Park filter expression using only built-in filter
- * components. Use null for "no filter".
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
- return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
- filter, apiKey, apiSecret);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart. This consumer also uses server-side filtering.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- * @param filter
- * A Highland Park filter expression using only built-in filter
- * components. Use null for "no filter".
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
- if (MRClientBuilders.sfConsumerMock != null)
- return MRClientBuilders.sfConsumerMock;
- try {
- return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
- apiSecret);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- /*************************************************************************/
- /*************************************************************************/
- /*************************************************************************/
-
- /**
- * Create a publisher that sends each message (or group of messages)
- * immediately. Most applications should favor higher latency for much
- * higher message throughput and the "simple publisher" is not a good
- * choice.
- *
- * @param hostlist
- * The host used in the URL to MR. Can be "host:port", can be
- * multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @return a publisher
- */
- public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
- return createBatchingPublisher(hostlist, topic, 1, 1);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown. Message payloads are
- * not compressed.
- *
- * @param hostlist
- * The host used in the URL to MR. Can be "host:port", can be
- * multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- *
- * @return a publisher
- */
- public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
- long maxAgeMs) {
- return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param hostlist
- * The host used in the URL to MR. Can be "host:port", can be
- * multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
- * @return a publisher
- */
- public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
- long maxAgeMs, boolean compress) {
- return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
- * @return a publisher
- */
- public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
- long maxAgeMs, boolean compress) {
- final TreeSet<String> hosts = new TreeSet<String>();
- for (String hp : hostSet) {
- hosts.add(hp);
- }
- return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
- * @return a publisher
- */
- public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
- int maxBatchSize, long maxAgeMs, boolean compress) {
- return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
- .compress(compress).build();
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param host
- * A host to be used in the URL to MR. Can be "host:port". Use
- * multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param username
- * username
- * @param password
- * password
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- * @param protocolFlag
- * http auth or ueb auth or dme2 method
- * @param producerFilePath
- * all properties for publisher
- * @return MRBatchingPublisher obj
- */
- public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
- final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag,
- String producerFilePath) {
- MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
- .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
- .compress(compress).build();
-
- pub.setHost(host);
- pub.setUsername(username);
- pub.setPassword(password);
- pub.setProtocolFlag(protocolFlag);
- return pub;
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown
- *
- * @param Properties
- * props set all properties for publishing message
- * @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
- */
- public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
- throws FileNotFoundException, IOException {
- return createInternalBatchingPublisher(props, withResponse);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown
- *
- * @param Properties
- * props set all properties for publishing message
- * @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
- */
- public static MRBatchingPublisher createBatchingPublisher(Properties props)
- throws FileNotFoundException, IOException {
- return createInternalBatchingPublisher(props, false);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown
- *
- * @param producerFilePath
- * set all properties for publishing message
- * @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
- */
- public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
- throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(producerFilePath));
- Properties props = new Properties();
- props.load(reader);
- return createBatchingPublisher(props);
- }
-
- /**
- * Create a publisher that will contain send methods that return response
- * object to user.
- *
- * @param producerFilePath
- * set all properties for publishing message
- * @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
- */
- public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
- throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(producerFilePath));
- Properties props = new Properties();
- props.load(reader);
- return createBatchingPublisher(props, withResponse);
- }
-
- protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
- throws FileNotFoundException, IOException {
- assert props != null;
- MRSimplerBatchPublisher pub;
- if (withResponse) {
- pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
- .onTopic(props.getProperty("topic"))
- .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
- Integer.parseInt(props.getProperty("maxAgeMs").toString()))
- .compress(Boolean.parseBoolean(props.getProperty("compress")))
- .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
- .withResponse(withResponse).build();
- } else {
- pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
- .onTopic(props.getProperty("topic"))
- .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
- Integer.parseInt(props.getProperty("maxAgeMs").toString()))
- .compress(Boolean.parseBoolean(props.getProperty("compress")))
- .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
- }
- pub.setHost(props.getProperty("host"));
- if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
-
- pub.setAuthKey(props.getProperty("authKey"));
- pub.setAuthDate(props.getProperty("authDate"));
- pub.setUsername(props.getProperty("username"));
- pub.setPassword(props.getProperty("password"));
- } else {
- pub.setUsername(props.getProperty("username"));
- pub.setPassword(props.getProperty("password"));
- }
- pub.setProtocolFlag(props.getProperty("TransportType"));
- pub.setProps(props);
- prop = new Properties();
- if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
- routeFilePath = props.getProperty("DME2preferredRouterFilePath");
- routeReader = new FileReader(new File(routeFilePath));
- File fo = new File(routeFilePath);
- if (!fo.exists()) {
- routeWriter = new FileWriter(new File(routeFilePath));
- }
- }
- return pub;
- }
-
- /**
- * Create an identity manager client to work with API keys.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param apiKey
- * Your API key
- * @param apiSecret
- * Your API secret
- * @return an identity manager
- */
- public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
- MRIdentityManager cim;
- try {
- cim = new MRMetaClient(hostSet);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- cim.setApiCredentials(apiKey, apiSecret);
- return cim;
- }
-
- /**
- * Create a topic manager for working with topics.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param apiKey
- * Your API key
- * @param apiSecret
- * Your API secret
- * @return a topic manager
- */
- public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
- MRMetaClient tmi;
- try {
- tmi = new MRMetaClient(hostSet);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- tmi.setApiCredentials(apiKey, apiSecret);
- return tmi;
- }
-
- /**
- * Inject a consumer. Used to support unit tests.
- *
- * @param cc
- */
- public static void $testInject(MRConsumer cc) {
- MRClientBuilders.sfConsumerMock = cc;
- }
-
- public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
- String id, int i, int j, String protocalFlag, String consumerFilePath) {
-
- MRConsumerImpl sub;
- try {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- sub.setUsername(username);
- sub.setPassword(password);
- sub.setHost(host);
- sub.setProtocolFlag(protocalFlag);
- sub.setConsumerFilePath(consumerFilePath);
- return sub;
-
- }
-
- public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
- String id, String protocalFlag, String consumerFilePath, int i, int j) {
-
- MRConsumerImpl sub;
- try {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- sub.setUsername(username);
- sub.setPassword(password);
- sub.setHost(host);
- sub.setProtocolFlag(protocalFlag);
- sub.setConsumerFilePath(consumerFilePath);
- return sub;
-
- }
-
- public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(consumerFilePath));
- Properties props = new Properties();
- props.load(reader);
-
- return createConsumer(props);
- }
-
- public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
- int timeout;
- ValidatorUtil.validateSubscriber(props);
- if (props.getProperty("timeout") != null)
- timeout = Integer.parseInt(props.getProperty("timeout"));
- else
- timeout = -1;
- int limit;
- if (props.getProperty("limit") != null)
- limit = Integer.parseInt(props.getProperty("limit"));
- else
- limit = -1;
- String group;
- if (props.getProperty("group") == null)
- group = UUID.randomUUID().toString();
- else
- group = props.getProperty("group");
- MRConsumerImpl sub = null;
- if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
- group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
- props.getProperty("authKey"), props.getProperty("authDate"));
- sub.setAuthKey(props.getProperty("authKey"));
- sub.setAuthDate(props.getProperty("authDate"));
- sub.setUsername(props.getProperty("username"));
- sub.setPassword(props.getProperty("password"));
- } else {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
- group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
- props.getProperty("username"), props.getProperty("password"));
- sub.setUsername(props.getProperty("username"));
- sub.setPassword(props.getProperty("password"));
- }
- sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
- sub.setProps(props);
- sub.setHost(props.getProperty("host"));
- sub.setProtocolFlag(props.getProperty("TransportType"));
- sub.setfFilter(props.getProperty("filter"));
- routeFilePath = props.getProperty("DME2preferredRouterFilePath");
- routeReader = new FileReader(new File(routeFilePath));
- prop = new Properties();
- File fo = new File(routeFilePath);
- if (!fo.exists()) {
- routeWriter = new FileWriter(new File(routeFilePath));
- }
- return sub;
- }
+ private static final String AUTH_KEY = "authKey";
+ private static final String AUTH_DATE = "authDate";
+ private static final String PASSWORD = "password";
+ private static final String USERNAME = "username";
+ private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
+ private static final String TOPIC = "topic";
+ private static final String TRANSPORT_TYPE = "TransportType";
+ public static MultivaluedMap<String, Object> HTTPHeadersMap;
+ public static Map<String, String> DME2HeadersMap;
+ public static String routeFilePath;
+
+ public static FileReader routeReader;
+
+ public static FileWriter routeWriter = null;
+ public static Properties prop = null;
+
+ /**
+ * Instantiates MRClientFactory.
+ */
+ private MRClientFactory() {
+ //prevents instantiation.
+ }
+
+ /**
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
+ *
+ * @param hostList
+ * A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default). For example,
+ * "hostname:8080,"
+ *
+ * @param topic
+ * The topic to consume
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(String hostList, String topic) {
+ return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
+ return createConsumer(hostSet, topic, null);
+ }
+
+ /**
+ * Create a consumer instance with server-side filtering, the default
+ * timeout, and no limit on messages returned. This consumer operates as an
+ * independent consumer (i.e., not in a group) and is NOT re-startable
+ * across sessions.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param filter
+ * a filter to use on the server side
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
+ return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostList
+ * A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default). For example,
+ * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
+ filter, apiKey, apiSecret);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ if (MRClientBuilders.sfConsumerMock != null)
+ return MRClientBuilders.sfConsumerMock;
+ try {
+ return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
+ apiSecret);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /*************************************************************************/
+ /*************************************************************************/
+ /*************************************************************************/
+
+ /**
+ * Create a publisher that sends each message (or group of messages)
+ * immediately. Most applications should favor higher latency for much
+ * higher message throughput and the "simple publisher" is not a good
+ * choice.
+ *
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
+ return createBatchingPublisher(hostlist, topic, 1, 1);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown. Message payloads are
+ * not compressed.
+ *
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+ long maxAgeMs) {
+ return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+ long maxAgeMs, boolean compress) {
+ return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
+ long maxAgeMs, boolean compress) {
+ final TreeSet<String> hosts = new TreeSet<String>();
+ for (String hp : hostSet) {
+ hosts.add(hp);
+ }
+ return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
+ int maxBatchSize, long maxAgeMs, boolean compress) {
+ return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+ .compress(compress).build();
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param host
+ * A host to be used in the URL to MR. Can be "host:port". Use
+ * multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param username
+ * username
+ * @param password
+ * password
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ * @param protocolFlag
+ * http auth or ueb auth or dme2 method
+ * @param producerFilePath
+ * all properties for publisher
+ * @return MRBatchingPublisher obj
+ */
+ public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
+ final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
+ MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+ .compress(compress).build();
+
+ pub.setHost(host);
+ pub.setUsername(username);
+ pub.setPassword(password);
+ pub.setProtocolFlag(protocolFlag);
+ return pub;
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param Properties
+ * props set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ return createInternalBatchingPublisher(props, withResponse);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param Properties
+ * props set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(Properties props)
+ throws FileNotFoundException, IOException {
+ return createInternalBatchingPublisher(props, false);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param producerFilePath
+ * set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
+ throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(producerFilePath));
+ Properties props = new Properties();
+ props.load(reader);
+ return createBatchingPublisher(props);
+ }
+
+ /**
+ * Create a publisher that will contain send methods that return response
+ * object to user.
+ *
+ * @param producerFilePath
+ * set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(producerFilePath));
+ Properties props = new Properties();
+ props.load(reader);
+ return createBatchingPublisher(props, withResponse);
+ }
+
+ protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ assert props != null;
+ MRSimplerBatchPublisher pub;
+ if (withResponse) {
+ pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .onTopic(props.getProperty(TOPIC))
+ .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+ Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+ .compress(Boolean.parseBoolean(props.getProperty("compress")))
+ .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
+ .withResponse(withResponse).build();
+ } else {
+ pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .onTopic(props.getProperty(TOPIC))
+ .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+ Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+ .compress(Boolean.parseBoolean(props.getProperty("compress")))
+ .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
+ }
+ pub.setHost(props.getProperty("host"));
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+
+ pub.setAuthKey(props.getProperty(AUTH_KEY));
+ pub.setAuthDate(props.getProperty(AUTH_DATE));
+ pub.setUsername(props.getProperty(USERNAME));
+ pub.setPassword(props.getProperty(PASSWORD));
+ } else {
+ pub.setUsername(props.getProperty(USERNAME));
+ pub.setPassword(props.getProperty(PASSWORD));
+ }
+ pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
+ pub.setProps(props);
+ prop = new Properties();
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+ routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
+ routeReader = new FileReader(new File(routeFilePath));
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
+ }
+ }
+ return pub;
+ }
+
+ /**
+ * Create an identity manager client to work with API keys.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
+ * @return an identity manager
+ */
+ public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
+ MRIdentityManager cim;
+ try {
+ cim = new MRMetaClient(hostSet);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ cim.setApiCredentials(apiKey, apiSecret);
+ return cim;
+ }
+
+ /**
+ * Create a topic manager for working with topics.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
+ * @return a topic manager
+ */
+ public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
+ MRMetaClient tmi;
+ try {
+ tmi = new MRMetaClient(hostSet);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ tmi.setApiCredentials(apiKey, apiSecret);
+ return tmi;
+ }
+
+ /**
+ * Inject a consumer. Used to support unit tests.
+ *
+ * @param cc
+ */
+ public static void $testInject(MRConsumer cc) {
+ MRClientBuilders.sfConsumerMock = cc;
+ }
+
+ public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+ String id, int i, int j, String protocalFlag, String consumerFilePath) {
+
+ MRConsumerImpl sub;
+ try {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ sub.setUsername(username);
+ sub.setPassword(password);
+ sub.setHost(host);
+ sub.setProtocolFlag(protocalFlag);
+ sub.setConsumerFilePath(consumerFilePath);
+ return sub;
+
+ }
+
+ public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+ String id, String protocalFlag, String consumerFilePath, int i, int j) {
+
+ MRConsumerImpl sub;
+ try {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ sub.setUsername(username);
+ sub.setPassword(password);
+ sub.setHost(host);
+ sub.setProtocolFlag(protocalFlag);
+ sub.setConsumerFilePath(consumerFilePath);
+ return sub;
+
+ }
+
+ public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(consumerFilePath));
+ Properties props = new Properties();
+ props.load(reader);
+
+ return createConsumer(props);
+ }
+
+ public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
+ int timeout;
+ ValidatorUtil.validateSubscriber(props);
+ if (props.getProperty("timeout") != null)
+ timeout = Integer.parseInt(props.getProperty("timeout"));
+ else
+ timeout = -1;
+ int limit;
+ if (props.getProperty("limit") != null)
+ limit = Integer.parseInt(props.getProperty("limit"));
+ else
+ limit = -1;
+ String group;
+ if (props.getProperty("group") == null)
+ group = UUID.randomUUID().toString();
+ else
+ group = props.getProperty("group");
+ MRConsumerImpl sub = null;
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
+ group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+ props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE));
+ sub.setAuthKey(props.getProperty(AUTH_KEY));
+ sub.setAuthDate(props.getProperty(AUTH_DATE));
+ sub.setUsername(props.getProperty(USERNAME));
+ sub.setPassword(props.getProperty(PASSWORD));
+ } else {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
+ group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+ props.getProperty(USERNAME), props.getProperty(PASSWORD));
+ sub.setUsername(props.getProperty(USERNAME));
+ sub.setPassword(props.getProperty(PASSWORD));
+ }
+
+ sub.setProps(props);
+ sub.setHost(props.getProperty("host"));
+ sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
+ sub.setfFilter(props.getProperty("filter"));
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+ MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
+ routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
+ routeReader = new FileReader(new File(routeFilePath));
+ prop = new Properties();
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
+ }
+ }
+
+ return sub;
+ }
}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
index fb49096..76bf5ce 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
@@ -261,19 +261,14 @@ public class MRBaseClient extends HttpClient implements MRClient {
}
}
- public JSONObject getNoAuth(final String path, final String username, final String password,
- final String protocolFlag) throws HttpException, JSONException {
- if (null != username && null != password) {
- WebTarget target=null;
- Response response=null;
- target = DmaapClientUtil.getTarget(path, username, password);
- response = DmaapClientUtil.getResponsewtNoAuth(target);
+ public JSONObject getNoAuth(final String path) throws HttpException, JSONException {
- return getResponseDataInJson(response);
- } else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
- }
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(path);
+ response = DmaapClientUtil.getResponsewtNoAuth(target);
+
+ return getResponseDataInJson(response);
}
public String getAuthResponse(final String path, final String authKey, final String authDate, final String username,
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
index 72d97c9..bc156ea 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
@@ -200,7 +200,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
try {
- final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
+ final JSONObject o = getNoAuth(urlPath);
if (o != null) {
final JSONArray a = o.getJSONArray("result");
if (a != null) {
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
index 8509042..4f44d30 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
@@ -864,6 +864,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
send(false);
}
}, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
+ pubResponse = new MRPublisherResponse();
}
private static class TimestampedMessage extends message {
diff --git a/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java
index 900c932..0539582 100644
--- a/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java
+++ b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java
@@ -64,10 +64,6 @@ public class ValidatorUtil {
if (id == null || id.isEmpty()) {
throw new IllegalArgumentException ( "Consumer (Id) is needed" );
}
- String sessionstickinessrequired = props.getProperty("sessionstickinessrequired");
- if (sessionstickinessrequired == null || sessionstickinessrequired.isEmpty()) {
- throw new IllegalArgumentException ( "sessionstickinessrequired is needed" );
- }
}
private static void validateForDME2(Properties props) {
@@ -132,6 +128,10 @@ public class ValidatorUtil {
if (subContextPath == null || subContextPath.isEmpty()) {
throw new IllegalArgumentException ( "SubContextPath is needed" );
}
+ String sessionstickinessrequired = props.getProperty("sessionstickinessrequired");
+ if (sessionstickinessrequired == null || sessionstickinessrequired.isEmpty()) {
+ throw new IllegalArgumentException ( "sessionstickinessrequired is needed" );
+ }
}
private static void validateForNonDME2(Properties props) {
@@ -144,14 +144,11 @@ public class ValidatorUtil {
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException ( "topic is needed" );
}
- String methodType = props.getProperty("MethodType");
- if (methodType == null || methodType.isEmpty()) {
- throw new IllegalArgumentException ( "MethodType is needed" );
- }
String contenttype = props.getProperty("contenttype");
if (contenttype == null || contenttype.isEmpty()) {
throw new IllegalArgumentException ( "contenttype is needed" );
}
+ if (!ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(transportType)){
String username = props.getProperty("username");
if (username == null || username.isEmpty()) {
throw new IllegalArgumentException ( "username is needed" );
@@ -160,6 +157,7 @@ public class ValidatorUtil {
if (password == null || password.isEmpty()) {
throw new IllegalArgumentException ( "password is needed" );
}
+ }
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(transportType)) {
String authKey = props.getProperty("authKey");
if (authKey == null || authKey.isEmpty()) {
@@ -167,7 +165,7 @@ public class ValidatorUtil {
}
String authDate = props.getProperty("authDate");
if (authDate == null || authDate.isEmpty()) {
- throw new IllegalArgumentException ( "password is needed" );
+ throw new IllegalArgumentException ( "authDate is needed" );
}
}
diff --git a/src/test/java/com/att/nsa/mr/client/MRClientBuildersTest.java b/src/test/java/com/att/nsa/mr/client/MRClientBuildersTest.java
index 2e7d05d..9a0a3ce 100644
--- a/src/test/java/com/att/nsa/mr/client/MRClientBuildersTest.java
+++ b/src/test/java/com/att/nsa/mr/client/MRClientBuildersTest.java
@@ -4,6 +4,8 @@
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -36,215 +38,211 @@ import com.att.nsa.mr.client.MRClientBuilders;
public class MRClientBuildersTest {
- private Collection<String> hostSet = new ArrayList<String>();
- private MRClientBuilders.ConsumerBuilder builder = null;
- private MRClientBuilders.PublisherBuilder pBuilder = null;
- private MRClientBuilders mrcBuilders = null;
-
- private String[] hostArray = new String[10];
-
- @Before
- public void setUp() throws Exception {
+ private Collection<String> hostSet = new ArrayList<String>();
+ private MRClientBuilders.ConsumerBuilder builder = null;
+ private MRClientBuilders.PublisherBuilder pBuilder = null;
+ private String[] hostArray = new String[10];
- for (int i = 0; i < 10; i++) {
- hostSet.add("host" + (i + 1));
- hostArray[i] = "host" + (i + 1);
- }
+ @Before
+ public void setUp() throws Exception {
- builder = new MRClientBuilders.ConsumerBuilder();
+ for (int i = 0; i < 10; i++) {
+ hostSet.add("host" + (i + 1));
+ hostArray[i] = "host" + (i + 1);
+ }
- pBuilder = new MRClientBuilders.PublisherBuilder();
+ builder = new MRClientBuilders.ConsumerBuilder();
- mrcBuilders = new MRClientBuilders();
+ pBuilder = new MRClientBuilders.PublisherBuilder();
- }
+ }
- @After
- public void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
- }
+ }
- @Test
- public void testUsingHosts() {
+ @Test
+ public void testUsingHosts() {
- builder.usingHosts("hostList");
- assertTrue(true);
+ builder.usingHosts("hostList");
+ assertTrue(true);
- }
+ }
- @Test
- public void testUsingHosts2() {
+ @Test
+ public void testUsingHosts2() {
- builder.usingHosts(hostSet);
- assertTrue(true);
+ builder.usingHosts(hostSet);
+ assertTrue(true);
- }
+ }
- @Test
- public void testOnTopic() {
+ @Test
+ public void testOnTopic() {
- builder.onTopic("testTopic");
- assertTrue(true);
+ builder.onTopic("testTopic");
+ assertTrue(true);
- }
+ }
- @Test
- public void testKnownAs() {
+ @Test
+ public void testKnownAs() {
- builder.knownAs("CG1", "23");
- assertTrue(true);
+ builder.knownAs("CG1", "23");
+ assertTrue(true);
- }
+ }
- @Test
- public void testAuthenticatedBy() {
+ @Test
+ public void testAuthenticatedBy() {
- builder.authenticatedBy("apikey", "apisecret");
- assertTrue(true);
+ builder.authenticatedBy("apikey", "apisecret");
+ assertTrue(true);
- }
+ }
- @Test
- public void testWaitAtServer() {
+ @Test
+ public void testWaitAtServer() {
- builder.waitAtServer(100);
- assertTrue(true);
+ builder.waitAtServer(100);
+ assertTrue(true);
- }
+ }
- @Test
- public void testReceivingAtMost() {
+ @Test
+ public void testReceivingAtMost() {
- builder.receivingAtMost(100);
- assertTrue(true);
+ builder.receivingAtMost(100);
+ assertTrue(true);
- }
+ }
- @Test
- public void testWithServerSideFilter() {
+ @Test
+ public void testWithServerSideFilter() {
- builder.withServerSideFilter("filter");
- assertTrue(true);
+ builder.withServerSideFilter("filter");
+ assertTrue(true);
- }
+ }
- @Test
- public void testBuild() {
+ @Test
+ public void testBuild() {
- try {
+ try {
- builder.build();
- } catch (IllegalArgumentException e) {
- assertTrue(true);
- }
+ builder.build();
+ } catch (IllegalArgumentException e) {
+ assertTrue(true);
+ }
- }
+ }
- @Test
- public void testUsingHosts3() {
+ @Test
+ public void testUsingHosts3() {
- pBuilder.usingHosts("testTopic");
- assertTrue(true);
+ pBuilder.usingHosts("testTopic");
+ assertTrue(true);
- }
+ }
- @Test
- public void testUsingHosts4() {
+ @Test
+ public void testUsingHosts4() {
- pBuilder.usingHosts(hostArray);
- assertTrue(true);
+ pBuilder.usingHosts(hostArray);
+ assertTrue(true);
- }
+ }
- @Test
- public void testUsingHosts5() {
+ @Test
+ public void testUsingHosts5() {
- pBuilder.usingHosts(hostSet);
- assertTrue(true);
+ pBuilder.usingHosts(hostSet);
+ assertTrue(true);
- }
+ }
- @Test
- public void testOnTopic2() {
+ @Test
+ public void testOnTopic2() {
- pBuilder.onTopic("testTopic");
- assertTrue(true);
+ pBuilder.onTopic("testTopic");
+ assertTrue(true);
- }
+ }
- @Test
- public void testLimitBatch() {
+ @Test
+ public void testLimitBatch() {
- pBuilder.limitBatch(100, 10);
- assertTrue(true);
+ pBuilder.limitBatch(100, 10);
+ assertTrue(true);
- }
+ }
- @Test
- public void testWithCompresion() {
+ @Test
+ public void testWithCompresion() {
- pBuilder.withCompresion();
- assertTrue(true);
+ pBuilder.withCompresion();
+ assertTrue(true);
- }
+ }
- @Test
- public void testWithoutCompresion() {
+ @Test
+ public void testWithoutCompresion() {
- pBuilder.withoutCompresion();
- assertTrue(true);
+ pBuilder.withoutCompresion();
+ assertTrue(true);
- }
+ }
- @Test
- public void testEnableCompresion() {
+ @Test
+ public void testEnableCompresion() {
- pBuilder.enableCompresion(true);
- assertTrue(true);
+ pBuilder.enableCompresion(true);
+ assertTrue(true);
- }
+ }
- @Test
- public void testAuthenticatedBy2() {
+ @Test
+ public void testAuthenticatedBy2() {
- pBuilder.authenticatedBy("apikey", "apisecret");
- assertTrue(true);
+ pBuilder.authenticatedBy("apikey", "apisecret");
+ assertTrue(true);
- }
+ }
- @Test
- public void testBuild2() {
+ @Test
+ public void testBuild2() {
- try {
+ try {
- pBuilder.build();
- } catch (IllegalArgumentException e) {
- assertTrue(true);
- }
+ pBuilder.build();
+ } catch (IllegalArgumentException e) {
+ assertTrue(true);
+ }
- }
+ }
- @Test
- public void test$testInject() {
+ @Test
+ public void test$testInject() {
- try {
+ try {
- mrcBuilders.$testInject(builder.build());
- } catch (IllegalArgumentException e) {
- assertTrue(true);
- }
+ MRClientBuilders.$testInject(builder.build());
+ } catch (IllegalArgumentException e) {
+ assertTrue(true);
+ }
- }
+ }
- @Test
- public void test$testInject2() {
+ @Test
+ public void test$testInject2() {
- try {
+ try {
- mrcBuilders.$testInject(pBuilder.build());
- } catch (IllegalArgumentException e) {
- assertTrue(true);
- }
+ MRClientBuilders.$testInject(pBuilder.build());
+ } catch (IllegalArgumentException e) {
+ assertTrue(true);
+ }
- }
+ }
}
diff --git a/src/test/java/com/att/nsa/mr/client/MRClientFactoryTest.java b/src/test/java/com/att/nsa/mr/client/MRClientFactoryTest.java
index 08bf23a..7ea091b 100644
--- a/src/test/java/com/att/nsa/mr/client/MRClientFactoryTest.java
+++ b/src/test/java/com/att/nsa/mr/client/MRClientFactoryTest.java
@@ -4,6 +4,8 @@
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -38,218 +40,213 @@ import com.att.nsa.mr.client.MRClientFactory;
public class MRClientFactoryTest {
- private Collection<String> hostSet = new ArrayList<String>();
-
- private MRClientFactory factory = null;
-
- private String[] hostArray = new String[10];
-
- @Before
- public void setUp() throws Exception {
+ private Collection<String> hostSet = new ArrayList<String>();
- for (int i = 0; i < 10; i++) {
- hostSet.add("host" + (i + 1));
- hostArray[i] = "host" + (i + 1);
- }
+ private String[] hostArray = new String[10];
- factory = new MRClientFactory();
+ @Before
+ public void setUp() throws Exception {
- }
+ for (int i = 0; i < 10; i++) {
+ hostSet.add("host" + (i + 1));
+ hostArray[i] = "host" + (i + 1);
+ }
+ }
- @After
- public void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
- }
+ }
- @Test
- public void testCreateConsumer() {
+ @Test
+ public void testCreateConsumer() {
- MRClientFactory.createConsumer("hostList hostList2", "testTopic");
- assertTrue(true);
+ MRClientFactory.createConsumer("hostList hostList2", "testTopic");
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateConsumer2() {
+ @Test
+ public void testCreateConsumer2() {
- MRClientFactory.createConsumer(hostSet, "testTopic");
- assertTrue(true);
+ MRClientFactory.createConsumer(hostSet, "testTopic");
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateConsumer3() {
+ @Test
+ public void testCreateConsumer3() {
- MRClientFactory.createConsumer(hostSet, "testTopic", "filter");
- assertTrue(true);
+ MRClientFactory.createConsumer(hostSet, "testTopic", "filter");
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateConsumer4() {
+ @Test
+ public void testCreateConsumer4() {
- MRClientFactory.createConsumer(hostSet, "testTopic", "CG1", "22");
- assertTrue(true);
+ MRClientFactory.createConsumer(hostSet, "testTopic", "CG1", "22");
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateConsumer5() {
+ @Test
+ public void testCreateConsumer5() {
- MRClientFactory.createConsumer(hostSet, "testTopic", "CG1", "22", 100, 100);
- assertTrue(true);
+ MRClientFactory.createConsumer(hostSet, "testTopic", "CG1", "22", 100, 100);
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateConsumer6() {
+ @Test
+ public void testCreateConsumer6() {
- MRClientFactory.createConsumer("hostList", "testTopic", "CG1", "22", 100, 100, "filter", "apikey", "apisecret");
- assertTrue(true);
+ MRClientFactory.createConsumer("hostList", "testTopic", "CG1", "22", 100, 100, "filter", "apikey", "apisecret");
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateConsumer7() {
+ @Test
+ public void testCreateConsumer7() {
- MRClientFactory.createConsumer(hostSet, "testTopic", "CG1", "22", 100, 100, "filter", "apikey", "apisecret");
- assertTrue(true);
+ MRClientFactory.createConsumer(hostSet, "testTopic", "CG1", "22", 100, 100, "filter", "apikey", "apisecret");
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateSimplePublisher() {
+ @Test
+ public void testCreateSimplePublisher() {
- MRClientFactory.createSimplePublisher("hostList", "testTopic");
- assertTrue(true);
+ MRClientFactory.createSimplePublisher("hostList", "testTopic");
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateBatchingPublisher1() {
+ @Test
+ public void testCreateBatchingPublisher1() {
- MRClientFactory.createBatchingPublisher("hostList", "testTopic", 100, 10);
- assertTrue(true);
+ MRClientFactory.createBatchingPublisher("hostList", "testTopic", 100, 10);
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateBatchingPublisher2() {
+ @Test
+ public void testCreateBatchingPublisher2() {
- MRClientFactory.createBatchingPublisher("hostList", "testTopic", 100, 10, true);
- assertTrue(true);
+ MRClientFactory.createBatchingPublisher("hostList", "testTopic", 100, 10, true);
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateBatchingPublisher3() {
+ @Test
+ public void testCreateBatchingPublisher3() {
- MRClientFactory.createBatchingPublisher(hostArray, "testTopic", 100, 10, true);
- assertTrue(true);
+ MRClientFactory.createBatchingPublisher(hostArray, "testTopic", 100, 10, true);
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateBatchingPublisher4() {
+ @Test
+ public void testCreateBatchingPublisher4() {
- MRClientFactory.createBatchingPublisher(hostSet, "testTopic", 100, 10, true);
- assertTrue(true);
+ MRClientFactory.createBatchingPublisher(hostSet, "testTopic", 100, 10, true);
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateBatchingPublisher5() {
+ @Test
+ public void testCreateBatchingPublisher5() {
- MRClientFactory.createBatchingPublisher("host", "testTopic", "username", "password", 100, 10, true,
- "protocolFlag", "/producer");
- assertTrue(true);
+ MRClientFactory.createBatchingPublisher("host", "testTopic", "username", "password", 100, 10, true,
+ "protocolFlag");
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateBatchingPublisher6() {
+ @Test
+ public void testCreateBatchingPublisher6() {
- try {
- MRClientFactory.createBatchingPublisher("/producer");
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- assertTrue(true);
+ try {
+ MRClientFactory.createBatchingPublisher("/producer");
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateBatchingPublisher7() {
+ @Test
+ public void testCreateBatchingPublisher7() {
- try {
- MRClientFactory.createBatchingPublisher("/producer", true);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- assertTrue(true);
+ try {
+ MRClientFactory.createBatchingPublisher("/producer", true);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateIdentityManager() {
+ @Test
+ public void testCreateIdentityManager() {
- MRClientFactory.createIdentityManager(hostSet, "apikey", "apisecret");
+ MRClientFactory.createIdentityManager(hostSet, "apikey", "apisecret");
- assertTrue(true);
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateTopicManager() {
+ @Test
+ public void testCreateTopicManager() {
- MRClientFactory.createTopicManager(hostSet, "apikey", "apisecret");
+ MRClientFactory.createTopicManager(hostSet, "apikey", "apisecret");
- assertTrue(true);
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateConsumer8() {
+ @Test
+ public void testCreateConsumer8() {
- try {
- MRClientFactory.createConsumer("/consumer");
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ try {
+ MRClientFactory.createConsumer("/consumer");
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
- assertTrue(true);
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateConsumer9() {
+ @Test
+ public void testCreateConsumer9() {
- MRClientFactory.createConsumer("host", "topic", "username", "password", "group", "23", "protocolFlag",
- "/consumer", 1, 2);
+ MRClientFactory.createConsumer("host", "topic", "username", "password", "group", "23", "protocolFlag",
+ "/consumer", 1, 2);
- assertTrue(true);
+ assertTrue(true);
- }
+ }
- @Test
- public void testCreateConsumer10() {
+ @Test
+ public void testCreateConsumer10() {
- MRClientFactory.createConsumer("host", "topic", "username", "password", "group", "23", 1, 2, "protocolFlag",
- "/consumer");
+ MRClientFactory.createConsumer("host", "topic", "username", "password", "group", "23", 1, 2, "protocolFlag",
+ "/consumer");
- assertTrue(true);
+ assertTrue(true);
- }
-
- @Test
- public void test$testInject() {
+ }
+
+ @Test
+ public void test$testInject() {
- MRClientFactory.$testInject(null);
- assertTrue(true);
+ MRClientFactory.$testInject(null);
+ assertTrue(true);
- }
+ }
} \ No newline at end of file
diff --git a/src/test/java/com/att/nsa/mr/client/impl/MRBaseClientTest.java b/src/test/java/com/att/nsa/mr/client/impl/MRBaseClientTest.java
index 56cf954..055c94f 100644
--- a/src/test/java/com/att/nsa/mr/client/impl/MRBaseClientTest.java
+++ b/src/test/java/com/att/nsa/mr/client/impl/MRBaseClientTest.java
@@ -448,21 +448,11 @@ public class MRBaseClientTest {
PowerMockito.when(response.getHeaders()).thenReturn(map);
PowerMockito.when(DmaapClientUtil.getResponsewtNoAuth(DmaapClientUtil.getTarget("/path"))).thenReturn(response);
- mrBaseClient.getNoAuth("/path", "username", "password", "HTTPAUTH");
+ mrBaseClient.getNoAuth("/path");
assertTrue(true);
}
- @Test(expected = HttpException.class)
- public void testGetNoAuth_error() throws JSONException, HttpException {
-
- ResponseBuilder responseBuilder = Response.ok();
- PowerMockito.when(DmaapClientUtil.getResponsewtNoAuth(DmaapClientUtil.getTarget("/path"))).thenReturn(
- responseBuilder.header("transactionid", "transactionid").entity("{\"test\":\"test\"}").build());
- mrBaseClient.getNoAuth("/path", null, null, "HTTPAUTH");
- assertTrue(true);
-
- }
@Test
public void testGetHTTPErrorResponseMessage() {
diff --git a/version.properties b/version.properties
index ccd6e4a..c31717c 100644
--- a/version.properties
+++ b/version.properties
@@ -27,7 +27,7 @@
major=1
minor=1
-patch=6
+patch=8
base_version=${major}.${minor}.${patch}