aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/nsa/mr/client/MRClientFactory.java')
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRClientFactory.java721
1 files changed, 417 insertions, 304 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
index 59e472c..b654282 100644
--- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
+++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
@@ -43,158 +43,205 @@ import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
/**
* A factory for MR clients.<br/>
* <br/>
- * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates
- * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive
- * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's
- * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across
- * them. Be sure to use a different ID for each instance.)<br/>
+ * Use caution selecting a consumer creator factory. If the call doesn't accept
+ * a consumer group name, then it creates a consumer that is not restartable.
+ * That is, if you stop your process and start it again, your client will NOT
+ * receive any missed messages on the topic. If you need to ensure receipt of
+ * missed messages, then you must use a consumer that's created with a group
+ * name and ID. (If you create multiple consumer processes using the same group,
+ * load is split across them. Be sure to use a different ID for each
+ * instance.)<br/>
* <br/>
- * Publishers
+ * Publishers
*
* @author author
*/
-public class MRClientFactory
-{
+public class MRClientFactory {
public static MultivaluedMap<String, Object> HTTPHeadersMap;
public static Map<String, String> DME2HeadersMap;
public static String routeFilePath;
-
+
public static FileReader routeReader;
-
- public static FileWriter routeWriter= null;
- public static Properties prop=null;
- //routeReader= new FileReader(new File (routeFilePath));
- //props= new Properties();
+
+ public static FileWriter routeWriter = null;
+ public static Properties prop = null;
+
+ // routeReader= new FileReader(new File (routeFilePath));
+ // props= new Properties();
/**
- * Create a consumer instance with the default timeout and no limit
- * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
- * across sessions.
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
*
- * @param hostList A comma separated list of hosts to use to connect to MR.
- * You can include port numbers (3904 is the default). For example, "hostname:8080,"
+ * @param hostList
+ * A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default). For example,
+ * "hostname:8080,"
*
- * @param topic The topic to consume
+ * @param topic
+ * The topic to consume
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( String hostList, String topic )
- {
- return createConsumer ( MRConsumerImpl.stringToList(hostList), topic );
+ public static MRConsumer createConsumer(String hostList, String topic) {
+ return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
}
/**
- * Create a consumer instance with the default timeout and no limit
- * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
- * across sessions.
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
*
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, String topic )
- {
- return createConsumer ( hostSet, topic, null );
+ public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
+ return createConsumer(hostSet, topic, null);
}
/**
- * Create a consumer instance with server-side filtering, the default timeout, and no limit
- * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
+ * Create a consumer instance with server-side filtering, the default
+ * timeout, and no limit on messages returned. This consumer operates as an
+ * independent consumer (i.e., not in a group) and is NOT re-startable
* across sessions.
*
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
- * @param filter a filter to use on the server side
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param filter
+ * a filter to use on the server side
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter )
- {
- return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null );
+ public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
+ return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
}
/**
- * Create a consumer instance with the default timeout, and no limit
- * on messages returned. This consumer can operate in a logical group and is re-startable
- * across sessions when you use the same group and ID on restart.
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
*
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consume in its group
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId )
- {
- return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 );
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
}
/**
- * Create a consumer instance with the default timeout, and no limit
- * on messages returned. This consumer can operate in a logical group and is re-startable
- * across sessions when you use the same group and ID on restart.
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
*
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consume in its group
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
- * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit)
- {
- return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null );
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
}
/**
- * Create a consumer instance with the default timeout, and no limit
- * on messages returned. This consumer can operate in a logical group and is re-startable
- * across sessions when you use the same group and ID on restart. This consumer also uses
- * server-side filtering.
- *
- * @param hostList A comma separated list of hosts to use to connect to MR.
- * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
- * @param topic The topic to consume
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consume in its group
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
- * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
- * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostList
+ * A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default). For example,
+ * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
- {
- return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup,
- consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
+ public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
+ filter, apiKey, apiSecret);
}
/**
- * Create a consumer instance with the default timeout, and no limit
- * on messages returned. This consumer can operate in a logical group and is re-startable
- * across sessions when you use the same group and ID on restart. This consumer also uses
- * server-side filtering.
- *
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consume in its group
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
- * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
- * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
- {
- if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock;
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ if (MRClientBuilders.sfConsumerMock != null)
+ return MRClientBuilders.sfConsumerMock;
try {
- return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
+ return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
+ apiSecret);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
@@ -203,282 +250,339 @@ public class MRClientFactory
/*************************************************************************/
/*************************************************************************/
/*************************************************************************/
-
+
/**
- * Create a publisher that sends each message (or group of messages) immediately. Most
- * applications should favor higher latency for much higher message throughput and the
- * "simple publisher" is not a good choice.
- *
- * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
- * @param topic The topic on which to publish messages.
+ * Create a publisher that sends each message (or group of messages)
+ * immediately. Most applications should favor higher latency for much
+ * higher message throughput and the "simple publisher" is not a good
+ * choice.
+ *
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
* @return a publisher
*/
- public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic )
- {
- return createBatchingPublisher ( hostlist, topic, 1, 1 );
+ public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
+ return createBatchingPublisher(hostlist, topic, 1, 1);
}
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown. Message payloads are not compressed.
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown. Message payloads are
+ * not compressed.
*
- * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
- * @param topic The topic on which to publish messages.
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
*
* @return a publisher
*/
- public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs )
- {
- return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false );
+ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+ long maxAgeMs) {
+ return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
}
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown.
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
*
- * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
- * @param topic The topic on which to publish messages.
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
- * @param compress use gzip compression
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
*
* @return a publisher
*/
- public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
- {
- return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress );
+ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+ long maxAgeMs, boolean compress) {
+ return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
}
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown.
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
*
- * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param topic The topic on which to publish messages.
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
- * @param compress use gzip compression
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
*
* @return a publisher
*/
- public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
- {
- final TreeSet<String> hosts = new TreeSet<String> ();
- for ( String hp : hostSet )
- {
- hosts.add ( hp );
+ public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
+ long maxAgeMs, boolean compress) {
+ final TreeSet<String> hosts = new TreeSet<String>();
+ for (String hp : hostSet) {
+ hosts.add(hp);
}
- return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress );
+ return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
}
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown.
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
*
- * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param topic The topic on which to publish messages.
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
- * @param compress use gzip compression
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
*
* @return a publisher
*/
- public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
- {
- return new MRSimplerBatchPublisher.Builder ().
- againstUrls ( hostSet ).
- onTopic ( topic ).
- batchTo ( maxBatchSize, maxAgeMs ).
- compress ( compress ).
- build ();
+ public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
+ int maxBatchSize, long maxAgeMs, boolean compress) {
+ return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+ .compress(compress).build();
}
-
+
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown.
- * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param topic The topic on which to publish messages.
- * @param username username
- * @param password password
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
- * @param compress use gzip compression
- * @param protocolFlag http auth or ueb auth or dme2 method
- * @param producerFilePath all properties for publisher
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param host
+ * A host to be used in the URL to MR. Can be "host:port". Use
+ * multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param username
+ * username
+ * @param password
+ * password
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ * @param protocolFlag
+ * http auth or ueb auth or dme2 method
+ * @param producerFilePath
+ * all properties for publisher
* @return MRBatchingPublisher obj
*/
- public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath )
- {
- MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
- againstUrls(MRConsumerImpl.stringToList(host)).
- onTopic ( topic ).
- batchTo ( maxBatchSize, maxAgeMs ).
- compress ( compress ).
- build ();
-
+ public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
+ final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag,
+ String producerFilePath) {
+ MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+ .compress(compress).build();
+
pub.setHost(host);
pub.setUsername(username);
pub.setPassword(password);
pub.setProtocolFlag(protocolFlag);
- pub.setProducerFilePath(producerFilePath);
return pub;
}
-
-
+
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown
- * @param producerFilePath set all properties for publishing message
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param Properties
+ * props set all properties for publishing message
* @return MRBatchingPublisher obj
- * @throws FileNotFoundException exc
- * @throws IOException ioex
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
*/
- public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException {
- FileReader reader = new FileReader(new File (producerFilePath));
- Properties props = new Properties();
+ public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ return createInternalBatchingPublisher(props, withResponse);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param Properties
+ * props set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(Properties props)
+ throws FileNotFoundException, IOException {
+ return createInternalBatchingPublisher(props, false);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param producerFilePath
+ * set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
+ throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(producerFilePath));
+ Properties props = new Properties();
props.load(reader);
- MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
- againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
- onTopic ( props.getProperty("topic") ).
- batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
- compress (Boolean.parseBoolean(props.getProperty("compress"))).
- httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
- build ();
- pub.setHost(props.getProperty("host"));
- if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-
- pub.setAuthKey(props.getProperty("authKey"));
- pub.setAuthDate(props.getProperty("authDate"));
- pub.setUsername(props.getProperty("username"));
- pub.setPassword(props.getProperty("password"));
- }else{
- pub.setUsername(props.getProperty("username"));
- pub.setPassword(props.getProperty("password"));
- }
- pub.setProducerFilePath(producerFilePath);
- pub.setProtocolFlag(props.getProperty("TransportType"));
- pub.setProps(props);
- routeFilePath=props.getProperty("DME2preferredRouterFilePath");
- routeReader= new FileReader(new File (routeFilePath));
- prop= new Properties();
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File(routeFilePath));
- }
- //pub.setContentType(contentType);
- return pub;
+ return createBatchingPublisher(props);
}
-
+
/**
- * Create a publisher that will contain send methods that return
- * response object to user.
- * @param producerFilePath set all properties for publishing message
+ * Create a publisher that will contain send methods that return response
+ * object to user.
+ *
+ * @param producerFilePath
+ * set all properties for publishing message
* @return MRBatchingPublisher obj
- * @throws FileNotFoundException exc
- * @throws IOException ioex
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
*/
- public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException {
- FileReader reader = new FileReader(new File (producerFilePath));
- Properties props = new Properties();
+ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(producerFilePath));
+ Properties props = new Properties();
props.load(reader);
- MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
- againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
- onTopic ( props.getProperty("topic") ).
- batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
- compress (Boolean.parseBoolean(props.getProperty("compress"))).
- httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
- withResponse(withResponse).
- build ();
+ return createBatchingPublisher(props, withResponse);
+ }
+
+ protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ assert props != null;
+ MRSimplerBatchPublisher pub;
+ if (withResponse) {
+ pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+ .onTopic(props.getProperty("topic"))
+ .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+ Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+ .compress(Boolean.parseBoolean(props.getProperty("compress")))
+ .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
+ .withResponse(withResponse).build();
+ } else {
+ pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+ .onTopic(props.getProperty("topic"))
+ .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+ Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+ .compress(Boolean.parseBoolean(props.getProperty("compress")))
+ .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
+ }
pub.setHost(props.getProperty("host"));
- if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-
+ if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+
pub.setAuthKey(props.getProperty("authKey"));
pub.setAuthDate(props.getProperty("authDate"));
pub.setUsername(props.getProperty("username"));
pub.setPassword(props.getProperty("password"));
- }else{
+ } else {
pub.setUsername(props.getProperty("username"));
pub.setPassword(props.getProperty("password"));
}
- pub.setProducerFilePath(producerFilePath);
pub.setProtocolFlag(props.getProperty("TransportType"));
pub.setProps(props);
- routeFilePath=props.getProperty("DME2preferredRouterFilePath");
- routeReader= new FileReader(new File (routeFilePath));
- prop= new Properties();
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File(routeFilePath));
+ routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+ routeReader = new FileReader(new File(routeFilePath));
+ prop = new Properties();
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
}
- //pub.setContentType(contentType);
return pub;
}
-
-
-
-
-
-
-
-
-
-
/**
* Create an identity manager client to work with API keys.
- * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param apiKey Your API key
- * @param apiSecret Your API secret
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
* @return an identity manager
*/
- public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret )
- {
+ public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
MRIdentityManager cim;
try {
- cim = new MRMetaClient ( hostSet );
+ cim = new MRMetaClient(hostSet);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
- cim.setApiCredentials ( apiKey, apiSecret );
+ cim.setApiCredentials(apiKey, apiSecret);
return cim;
}
/**
* Create a topic manager for working with topics.
- * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param apiKey Your API key
- * @param apiSecret Your API secret
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
* @return a topic manager
*/
- public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret )
- {
+ public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
MRMetaClient tmi;
try {
- tmi = new MRMetaClient ( hostSet );
+ tmi = new MRMetaClient(hostSet);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
- tmi.setApiCredentials ( apiKey, apiSecret );
+ tmi.setApiCredentials(apiKey, apiSecret);
return tmi;
}
/**
* Inject a consumer. Used to support unit tests.
+ *
* @param cc
*/
- public static void $testInject ( MRConsumer cc )
- {
+ public static void $testInject(MRConsumer cc) {
MRClientBuilders.sfConsumerMock = cc;
}
- public static MRConsumer createConsumer(String host, String topic, String username,
- String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) {
+ public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+ String id, int i, int j, String protocalFlag, String consumerFilePath) {
MRConsumerImpl sub;
try {
- sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
@@ -488,15 +592,15 @@ public class MRClientFactory
sub.setProtocolFlag(protocalFlag);
sub.setConsumerFilePath(consumerFilePath);
return sub;
-
+
}
-
- public static MRConsumer createConsumer(String host, String topic, String username,
- String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) {
+
+ public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+ String id, String protocalFlag, String consumerFilePath, int i, int j) {
MRConsumerImpl sub;
try {
- sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
@@ -506,52 +610,61 @@ public class MRClientFactory
sub.setProtocolFlag(protocalFlag);
sub.setConsumerFilePath(consumerFilePath);
return sub;
-
+
}
- public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException {
- FileReader reader = new FileReader(new File (consumerFilePath));
- Properties props = new Properties();
+ public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(consumerFilePath));
+ Properties props = new Properties();
props.load(reader);
+
+ return createConsumer(props);
+ }
+
+ public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
int timeout;
- if(props.getProperty("timeout")!=null)
- timeout=Integer.parseInt(props.getProperty("timeout"));
+ if (props.getProperty("timeout") != null)
+ timeout = Integer.parseInt(props.getProperty("timeout"));
else
- timeout=-1;
+ timeout = -1;
int limit;
- if(props.getProperty("limit")!=null)
- limit=Integer.parseInt(props.getProperty("limit"));
+ if (props.getProperty("limit") != null)
+ limit = Integer.parseInt(props.getProperty("limit"));
else
- limit=-1;
+ limit = -1;
String group;
- if(props.getProperty("group")==null)
- group=UUID.randomUUID ().toString();
+ if (props.getProperty("group") == null)
+ group = UUID.randomUUID().toString();
else
- group=props.getProperty("group");
- MRConsumerImpl sub=null;
- if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
- sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") );
+ group = props.getProperty("group");
+ MRConsumerImpl sub = null;
+ if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
+ group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+ props.getProperty("authKey"), props.getProperty("authDate"));
sub.setAuthKey(props.getProperty("authKey"));
sub.setAuthDate(props.getProperty("authDate"));
sub.setUsername(props.getProperty("username"));
sub.setPassword(props.getProperty("password"));
- }else{
- sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") );
+ } else {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
+ group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+ props.getProperty("username"), props.getProperty("password"));
sub.setUsername(props.getProperty("username"));
sub.setPassword(props.getProperty("password"));
}
sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
- sub.setProps(props);
+ sub.setProps(props);
sub.setHost(props.getProperty("host"));
sub.setProtocolFlag(props.getProperty("TransportType"));
- //sub.setConsumerFilePath(consumerFilePath);
+ // sub.setConsumerFilePath(consumerFilePath);
sub.setfFilter(props.getProperty("filter"));
- routeFilePath=props.getProperty("DME2preferredRouterFilePath");
- routeReader= new FileReader(new File (routeFilePath));
- prop= new Properties();
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File(routeFilePath));
+ routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+ routeReader = new FileReader(new File(routeFilePath));
+ prop = new Properties();
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
}
return sub;
}