From 85c21e1d85c545717affd3f18cd8e9fe6dc14562 Mon Sep 17 00:00:00 2001 From: "sunil.unnava" Date: Tue, 23 Jan 2018 15:26:15 -0500 Subject: Changes to the DMaap Client Added new API to the DMaapClient Issue-ID: DMAAP-214 Change-Id: I4de2da7ca42ad1b5925a2df9d26672875dd15b10 Signed-off-by: sunil.unnava --- .../com/att/nsa/mr/client/MRClientFactory.java | 721 ++++++++++++--------- 1 file changed, 417 insertions(+), 304 deletions(-) (limited to 'src/main/java/com/att/nsa/mr/client/MRClientFactory.java') diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java index 59e472c..b654282 100644 --- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java +++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java @@ -43,158 +43,205 @@ import com.att.nsa.mr.test.clients.ProtocolTypeConstants; /** * A factory for MR clients.
*
- * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates - * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive - * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's - * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across - * them. Be sure to use a different ID for each instance.)
+ * Use caution selecting a consumer creator factory. If the call doesn't accept + * a consumer group name, then it creates a consumer that is not restartable. + * That is, if you stop your process and start it again, your client will NOT + * receive any missed messages on the topic. If you need to ensure receipt of + * missed messages, then you must use a consumer that's created with a group + * name and ID. (If you create multiple consumer processes using the same group, + * load is split across them. Be sure to use a different ID for each + * instance.)
*
- * Publishers + * Publishers * * @author author */ -public class MRClientFactory -{ +public class MRClientFactory { public static MultivaluedMap HTTPHeadersMap; public static Map DME2HeadersMap; public static String routeFilePath; - + public static FileReader routeReader; - - public static FileWriter routeWriter= null; - public static Properties prop=null; - //routeReader= new FileReader(new File (routeFilePath)); - //props= new Properties(); + + public static FileWriter routeWriter = null; + public static Properties prop = null; + + // routeReader= new FileReader(new File (routeFilePath)); + // props= new Properties(); /** - * Create a consumer instance with the default timeout and no limit - * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. * - * @param hostList A comma separated list of hosts to use to connect to MR. - * You can include port numbers (3904 is the default). For example, "hostname:8080," + * @param hostList + * A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default). For example, + * "hostname:8080," * - * @param topic The topic to consume + * @param topic + * The topic to consume * * @return a consumer */ - public static MRConsumer createConsumer ( String hostList, String topic ) - { - return createConsumer ( MRConsumerImpl.stringToList(hostList), topic ); + public static MRConsumer createConsumer(String hostList, String topic) { + return createConsumer(MRConsumerImpl.stringToList(hostList), topic); } /** - * Create a consumer instance with the default timeout and no limit - * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume * * @return a consumer */ - public static MRConsumer createConsumer ( Collection hostSet, String topic ) - { - return createConsumer ( hostSet, topic, null ); + public static MRConsumer createConsumer(Collection hostSet, String topic) { + return createConsumer(hostSet, topic, null); } /** - * Create a consumer instance with server-side filtering, the default timeout, and no limit - * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable + * Create a consumer instance with server-side filtering, the default + * timeout, and no limit on messages returned. This consumer operates as an + * independent consumer (i.e., not in a group) and is NOT re-startable * across sessions. * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param filter a filter to use on the server side + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param filter + * a filter to use on the server side * * @return a consumer */ - public static MRConsumer createConsumer ( Collection hostSet, String topic, String filter ) - { - return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null ); + public static MRConsumer createConsumer(Collection hostSet, String topic, String filter) { + return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null); } /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group * * @return a consumer */ - public static MRConsumer createConsumer ( Collection hostSet, final String topic, final String consumerGroup, final String consumerId ) - { - return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 ); + public static MRConsumer createConsumer(Collection hostSet, final String topic, final String consumerGroup, + final String consumerId) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1); } /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. * * @return a consumer */ - public static MRConsumer createConsumer ( Collection hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit) - { - return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null ); + public static MRConsumer createConsumer(Collection hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null); } /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. This consumer also uses - * server-side filtering. - * - * @param hostList A comma separated list of hosts to use to connect to MR. - * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. - * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostList + * A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default). For example, + * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". * * @return a consumer */ - public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) - { - return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup, - consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); + public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit, + filter, apiKey, apiSecret); } /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. This consumer also uses - * server-side filtering. - * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. - * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". * * @return a consumer */ - public static MRConsumer createConsumer ( Collection hostSet, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) - { - if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock; + public static MRConsumer createConsumer(Collection hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + if (MRClientBuilders.sfConsumerMock != null) + return MRClientBuilders.sfConsumerMock; try { - return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); + return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, + apiSecret); } catch (MalformedURLException e) { throw new RuntimeException(e); } @@ -203,282 +250,339 @@ public class MRClientFactory /*************************************************************************/ /*************************************************************************/ /*************************************************************************/ - + /** - * Create a publisher that sends each message (or group of messages) immediately. Most - * applications should favor higher latency for much higher message throughput and the - * "simple publisher" is not a good choice. - * - * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @param topic The topic on which to publish messages. + * Create a publisher that sends each message (or group of messages) + * immediately. Most applications should favor higher latency for much + * higher message throughput and the "simple publisher" is not a good + * choice. + * + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. * @return a publisher */ - public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic ) - { - return createBatchingPublisher ( hostlist, topic, 1, 1 ); + public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) { + return createBatchingPublisher(hostlist, topic, 1, 1); } /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. Message payloads are not compressed. + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. Message payloads are + * not compressed. * - * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch * * @return a publisher */ - public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs ) - { - return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false ); + public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, + long maxAgeMs) { + return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false); } /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. * - * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression * * @return a publisher */ - public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress ); + public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, + long maxAgeMs, boolean compress) { + return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress); } /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. * - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression * * @return a publisher */ - public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - final TreeSet hosts = new TreeSet (); - for ( String hp : hostSet ) - { - hosts.add ( hp ); + public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize, + long maxAgeMs, boolean compress) { + final TreeSet hosts = new TreeSet(); + for (String hp : hostSet) { + hosts.add(hp); } - return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress ); + return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress); } /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. * - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression * * @return a publisher */ - public static MRBatchingPublisher createBatchingPublisher ( Collection hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - return new MRSimplerBatchPublisher.Builder (). - againstUrls ( hostSet ). - onTopic ( topic ). - batchTo ( maxBatchSize, maxAgeMs ). - compress ( compress ). - build (); + public static MRBatchingPublisher createBatchingPublisher(Collection hostSet, String topic, + int maxBatchSize, long maxAgeMs, boolean compress) { + return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) + .compress(compress).build(); } - + /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. - * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param topic The topic on which to publish messages. - * @param username username - * @param password password - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression - * @param protocolFlag http auth or ueb auth or dme2 method - * @param producerFilePath all properties for publisher + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param host + * A host to be used in the URL to MR. Can be "host:port". Use + * multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param username + * username + * @param password + * password + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * @param protocolFlag + * http auth or ueb auth or dme2 method + * @param producerFilePath + * all properties for publisher * @return MRBatchingPublisher obj */ - public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath ) - { - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls(MRConsumerImpl.stringToList(host)). - onTopic ( topic ). - batchTo ( maxBatchSize, maxAgeMs ). - compress ( compress ). - build (); - + public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username, + final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, + String producerFilePath) { + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder() + .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) + .compress(compress).build(); + pub.setHost(host); pub.setUsername(username); pub.setPassword(password); pub.setProtocolFlag(protocolFlag); - pub.setProducerFilePath(producerFilePath); return pub; } - - + /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown - * @param producerFilePath set all properties for publishing message + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param Properties + * props set all properties for publishing message * @return MRBatchingPublisher obj - * @throws FileNotFoundException exc - * @throws IOException ioex + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex */ - public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException { - FileReader reader = new FileReader(new File (producerFilePath)); - Properties props = new Properties(); + public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse) + throws FileNotFoundException, IOException { + return createInternalBatchingPublisher(props, withResponse); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param Properties + * props set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(Properties props) + throws FileNotFoundException, IOException { + return createInternalBatchingPublisher(props, false); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param producerFilePath + * set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath) + throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(producerFilePath)); + Properties props = new Properties(); props.load(reader); - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). - onTopic ( props.getProperty("topic") ). - batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). - compress (Boolean.parseBoolean(props.getProperty("compress"))). - httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). - build (); - pub.setHost(props.getProperty("host")); - if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ - - pub.setAuthKey(props.getProperty("authKey")); - pub.setAuthDate(props.getProperty("authDate")); - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - }else{ - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - } - pub.setProducerFilePath(producerFilePath); - pub.setProtocolFlag(props.getProperty("TransportType")); - pub.setProps(props); - routeFilePath=props.getProperty("DME2preferredRouterFilePath"); - routeReader= new FileReader(new File (routeFilePath)); - prop= new Properties(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File(routeFilePath)); - } - //pub.setContentType(contentType); - return pub; + return createBatchingPublisher(props); } - + /** - * Create a publisher that will contain send methods that return - * response object to user. - * @param producerFilePath set all properties for publishing message + * Create a publisher that will contain send methods that return response + * object to user. + * + * @param producerFilePath + * set all properties for publishing message * @return MRBatchingPublisher obj - * @throws FileNotFoundException exc - * @throws IOException ioex + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex */ - public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException { - FileReader reader = new FileReader(new File (producerFilePath)); - Properties props = new Properties(); + public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse) + throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(producerFilePath)); + Properties props = new Properties(); props.load(reader); - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). - onTopic ( props.getProperty("topic") ). - batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). - compress (Boolean.parseBoolean(props.getProperty("compress"))). - httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). - withResponse(withResponse). - build (); + return createBatchingPublisher(props, withResponse); + } + + protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse) + throws FileNotFoundException, IOException { + assert props != null; + MRSimplerBatchPublisher pub; + if (withResponse) { + pub = new MRSimplerBatchPublisher.Builder() + .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))) + .onTopic(props.getProperty("topic")) + .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), + Integer.parseInt(props.getProperty("maxAgeMs").toString())) + .compress(Boolean.parseBoolean(props.getProperty("compress"))) + .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))) + .withResponse(withResponse).build(); + } else { + pub = new MRSimplerBatchPublisher.Builder() + .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))) + .onTopic(props.getProperty("topic")) + .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), + Integer.parseInt(props.getProperty("maxAgeMs").toString())) + .compress(Boolean.parseBoolean(props.getProperty("compress"))) + .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build(); + } pub.setHost(props.getProperty("host")); - if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ - + if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + pub.setAuthKey(props.getProperty("authKey")); pub.setAuthDate(props.getProperty("authDate")); pub.setUsername(props.getProperty("username")); pub.setPassword(props.getProperty("password")); - }else{ + } else { pub.setUsername(props.getProperty("username")); pub.setPassword(props.getProperty("password")); } - pub.setProducerFilePath(producerFilePath); pub.setProtocolFlag(props.getProperty("TransportType")); pub.setProps(props); - routeFilePath=props.getProperty("DME2preferredRouterFilePath"); - routeReader= new FileReader(new File (routeFilePath)); - prop= new Properties(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File(routeFilePath)); + routeFilePath = props.getProperty("DME2preferredRouterFilePath"); + routeReader = new FileReader(new File(routeFilePath)); + prop = new Properties(); + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); } - //pub.setContentType(contentType); return pub; } - - - - - - - - - - /** * Create an identity manager client to work with API keys. - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param apiKey Your API key - * @param apiSecret Your API secret + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret * @return an identity manager */ - public static MRIdentityManager createIdentityManager ( Collection hostSet, String apiKey, String apiSecret ) - { + public static MRIdentityManager createIdentityManager(Collection hostSet, String apiKey, String apiSecret) { MRIdentityManager cim; try { - cim = new MRMetaClient ( hostSet ); + cim = new MRMetaClient(hostSet); } catch (MalformedURLException e) { throw new RuntimeException(e); } - cim.setApiCredentials ( apiKey, apiSecret ); + cim.setApiCredentials(apiKey, apiSecret); return cim; } /** * Create a topic manager for working with topics. - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param apiKey Your API key - * @param apiSecret Your API secret + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret * @return a topic manager */ - public static MRTopicManager createTopicManager ( Collection hostSet, String apiKey, String apiSecret ) - { + public static MRTopicManager createTopicManager(Collection hostSet, String apiKey, String apiSecret) { MRMetaClient tmi; try { - tmi = new MRMetaClient ( hostSet ); + tmi = new MRMetaClient(hostSet); } catch (MalformedURLException e) { throw new RuntimeException(e); } - tmi.setApiCredentials ( apiKey, apiSecret ); + tmi.setApiCredentials(apiKey, apiSecret); return tmi; } /** * Inject a consumer. Used to support unit tests. + * * @param cc */ - public static void $testInject ( MRConsumer cc ) - { + public static void $testInject(MRConsumer cc) { MRClientBuilders.sfConsumerMock = cc; } - public static MRConsumer createConsumer(String host, String topic, String username, - String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) { + public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, + String id, int i, int j, String protocalFlag, String consumerFilePath) { MRConsumerImpl sub; try { - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); } catch (MalformedURLException e) { throw new RuntimeException(e); } @@ -488,15 +592,15 @@ public class MRClientFactory sub.setProtocolFlag(protocalFlag); sub.setConsumerFilePath(consumerFilePath); return sub; - + } - - public static MRConsumer createConsumer(String host, String topic, String username, - String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) { + + public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, + String id, String protocalFlag, String consumerFilePath, int i, int j) { MRConsumerImpl sub; try { - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); } catch (MalformedURLException e) { throw new RuntimeException(e); } @@ -506,52 +610,61 @@ public class MRClientFactory sub.setProtocolFlag(protocalFlag); sub.setConsumerFilePath(consumerFilePath); return sub; - + } - public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException { - FileReader reader = new FileReader(new File (consumerFilePath)); - Properties props = new Properties(); + public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(consumerFilePath)); + Properties props = new Properties(); props.load(reader); + + return createConsumer(props); + } + + public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException { int timeout; - if(props.getProperty("timeout")!=null) - timeout=Integer.parseInt(props.getProperty("timeout")); + if (props.getProperty("timeout") != null) + timeout = Integer.parseInt(props.getProperty("timeout")); else - timeout=-1; + timeout = -1; int limit; - if(props.getProperty("limit")!=null) - limit=Integer.parseInt(props.getProperty("limit")); + if (props.getProperty("limit") != null) + limit = Integer.parseInt(props.getProperty("limit")); else - limit=-1; + limit = -1; String group; - if(props.getProperty("group")==null) - group=UUID.randomUUID ().toString(); + if (props.getProperty("group") == null) + group = UUID.randomUUID().toString(); else - group=props.getProperty("group"); - MRConsumerImpl sub=null; - if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") ); + group = props.getProperty("group"); + MRConsumerImpl sub = null; + if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), + group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), + props.getProperty("authKey"), props.getProperty("authDate")); sub.setAuthKey(props.getProperty("authKey")); sub.setAuthDate(props.getProperty("authDate")); sub.setUsername(props.getProperty("username")); sub.setPassword(props.getProperty("password")); - }else{ - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") ); + } else { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), + group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), + props.getProperty("username"), props.getProperty("password")); sub.setUsername(props.getProperty("username")); sub.setPassword(props.getProperty("password")); } sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath")); - sub.setProps(props); + sub.setProps(props); sub.setHost(props.getProperty("host")); sub.setProtocolFlag(props.getProperty("TransportType")); - //sub.setConsumerFilePath(consumerFilePath); + // sub.setConsumerFilePath(consumerFilePath); sub.setfFilter(props.getProperty("filter")); - routeFilePath=props.getProperty("DME2preferredRouterFilePath"); - routeReader= new FileReader(new File (routeFilePath)); - prop= new Properties(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File(routeFilePath)); + routeFilePath = props.getProperty("DME2preferredRouterFilePath"); + routeReader = new FileReader(new File(routeFilePath)); + prop = new Properties(); + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); } return sub; } -- cgit 1.2.3-korg