diff options
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/com/att/nsa/mr/client/MRClientFactory.java | 1247 |
1 files changed, 630 insertions, 617 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java index a51e7eb..689190e 100644 --- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java +++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java @@ -4,6 +4,8 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2018 IBM. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -58,621 +60,632 @@ import com.att.nsa.mr.tools.ValidatorUtil; * @author author */ public class MRClientFactory { - public static MultivaluedMap<String, Object> HTTPHeadersMap; - public static Map<String, String> DME2HeadersMap; - public static String routeFilePath; - - public static FileReader routeReader; - - public static FileWriter routeWriter = null; - public static Properties prop = null; - - - // props= new Properties(); - /** - * Create a consumer instance with the default timeout and no limit on - * messages returned. This consumer operates as an independent consumer - * (i.e., not in a group) and is NOT re-startable across sessions. - * - * @param hostList - * A comma separated list of hosts to use to connect to MR. You - * can include port numbers (3904 is the default). For example, - * "hostname:8080," - * - * @param topic - * The topic to consume - * - * @return a consumer - */ - public static MRConsumer createConsumer(String hostList, String topic) { - return createConsumer(MRConsumerImpl.stringToList(hostList), topic); - } - - /** - * Create a consumer instance with the default timeout and no limit on - * messages returned. This consumer operates as an independent consumer - * (i.e., not in a group) and is NOT re-startable across sessions. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, String topic) { - return createConsumer(hostSet, topic, null); - } - - /** - * Create a consumer instance with server-side filtering, the default - * timeout, and no limit on messages returned. This consumer operates as an - * independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param filter - * a filter to use on the server side - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) { - return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId) { - return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit) { - return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. This consumer also uses server-side filtering. - * - * @param hostList - * A comma separated list of hosts to use to connect to MR. You - * can include port numbers (3904 is the default). For example, - * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * @param filter - * A Highland Park filter expression using only built-in filter - * components. Use null for "no filter". - * - * @return a consumer - */ - public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { - return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit, - filter, apiKey, apiSecret); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. This consumer also uses server-side filtering. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * @param filter - * A Highland Park filter expression using only built-in filter - * components. Use null for "no filter". - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { - if (MRClientBuilders.sfConsumerMock != null) - return MRClientBuilders.sfConsumerMock; - try { - return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, - apiSecret); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - } - - /*************************************************************************/ - /*************************************************************************/ - /*************************************************************************/ - - /** - * Create a publisher that sends each message (or group of messages) - * immediately. Most applications should favor higher latency for much - * higher message throughput and the "simple publisher" is not a good - * choice. - * - * @param hostlist - * The host used in the URL to MR. Can be "host:port", can be - * multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @return a publisher - */ - public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) { - return createBatchingPublisher(hostlist, topic, 1, 1); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. Message payloads are - * not compressed. - * - * @param hostlist - * The host used in the URL to MR. Can be "host:port", can be - * multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, - long maxAgeMs) { - return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param hostlist - * The host used in the URL to MR. Can be "host:port", can be - * multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, - long maxAgeMs, boolean compress) { - return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize, - long maxAgeMs, boolean compress) { - final TreeSet<String> hosts = new TreeSet<String>(); - for (String hp : hostSet) { - hosts.add(hp); - } - return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic, - int maxBatchSize, long maxAgeMs, boolean compress) { - return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) - .compress(compress).build(); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param host - * A host to be used in the URL to MR. Can be "host:port". Use - * multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param username - * username - * @param password - * password - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * @param protocolFlag - * http auth or ueb auth or dme2 method - * @param producerFilePath - * all properties for publisher - * @return MRBatchingPublisher obj - */ - public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username, - final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, - String producerFilePath) { - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder() - .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) - .compress(compress).build(); - - pub.setHost(host); - pub.setUsername(username); - pub.setPassword(password); - pub.setProtocolFlag(protocolFlag); - return pub; - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown - * - * @param Properties - * props set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex - */ - public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse) - throws FileNotFoundException, IOException { - return createInternalBatchingPublisher(props, withResponse); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown - * - * @param Properties - * props set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex - */ - public static MRBatchingPublisher createBatchingPublisher(Properties props) - throws FileNotFoundException, IOException { - return createInternalBatchingPublisher(props, false); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown - * - * @param producerFilePath - * set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex - */ - public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath) - throws FileNotFoundException, IOException { - FileReader reader = new FileReader(new File(producerFilePath)); - Properties props = new Properties(); - props.load(reader); - return createBatchingPublisher(props); - } - - /** - * Create a publisher that will contain send methods that return response - * object to user. - * - * @param producerFilePath - * set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex - */ - public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse) - throws FileNotFoundException, IOException { - FileReader reader = new FileReader(new File(producerFilePath)); - Properties props = new Properties(); - props.load(reader); - return createBatchingPublisher(props, withResponse); - } - - protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse) - throws FileNotFoundException, IOException { - assert props != null; - MRSimplerBatchPublisher pub; - if (withResponse) { - pub = new MRSimplerBatchPublisher.Builder() - .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType")) - .onTopic(props.getProperty("topic")) - .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), - Integer.parseInt(props.getProperty("maxAgeMs").toString())) - .compress(Boolean.parseBoolean(props.getProperty("compress"))) - .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))) - .withResponse(withResponse).build(); - } else { - pub = new MRSimplerBatchPublisher.Builder() - .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType")) - .onTopic(props.getProperty("topic")) - .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), - Integer.parseInt(props.getProperty("maxAgeMs").toString())) - .compress(Boolean.parseBoolean(props.getProperty("compress"))) - .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build(); - } - pub.setHost(props.getProperty("host")); - if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { - - pub.setAuthKey(props.getProperty("authKey")); - pub.setAuthDate(props.getProperty("authDate")); - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - } else { - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - } - pub.setProtocolFlag(props.getProperty("TransportType")); - pub.setProps(props); - prop = new Properties(); - if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { - routeFilePath = props.getProperty("DME2preferredRouterFilePath"); - routeReader = new FileReader(new File(routeFilePath)); - File fo = new File(routeFilePath); - if (!fo.exists()) { - routeWriter = new FileWriter(new File(routeFilePath)); - } - } - return pub; - } - - /** - * Create an identity manager client to work with API keys. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param apiKey - * Your API key - * @param apiSecret - * Your API secret - * @return an identity manager - */ - public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) { - MRIdentityManager cim; - try { - cim = new MRMetaClient(hostSet); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - cim.setApiCredentials(apiKey, apiSecret); - return cim; - } - - /** - * Create a topic manager for working with topics. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param apiKey - * Your API key - * @param apiSecret - * Your API secret - * @return a topic manager - */ - public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) { - MRMetaClient tmi; - try { - tmi = new MRMetaClient(hostSet); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - tmi.setApiCredentials(apiKey, apiSecret); - return tmi; - } - - /** - * Inject a consumer. Used to support unit tests. - * - * @param cc - */ - public static void $testInject(MRConsumer cc) { - MRClientBuilders.sfConsumerMock = cc; - } - - public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, - String id, int i, int j, String protocalFlag, String consumerFilePath) { - - MRConsumerImpl sub; - try { - sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - sub.setUsername(username); - sub.setPassword(password); - sub.setHost(host); - sub.setProtocolFlag(protocalFlag); - sub.setConsumerFilePath(consumerFilePath); - return sub; - - } - - public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, - String id, String protocalFlag, String consumerFilePath, int i, int j) { - - MRConsumerImpl sub; - try { - sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - sub.setUsername(username); - sub.setPassword(password); - sub.setHost(host); - sub.setProtocolFlag(protocalFlag); - sub.setConsumerFilePath(consumerFilePath); - return sub; - - } - - public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException { - FileReader reader = new FileReader(new File(consumerFilePath)); - Properties props = new Properties(); - props.load(reader); - - return createConsumer(props); - } - - public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException { - int timeout; - ValidatorUtil.validateSubscriber(props); - if (props.getProperty("timeout") != null) - timeout = Integer.parseInt(props.getProperty("timeout")); - else - timeout = -1; - int limit; - if (props.getProperty("limit") != null) - limit = Integer.parseInt(props.getProperty("limit")); - else - limit = -1; - String group; - if (props.getProperty("group") == null) - group = UUID.randomUUID().toString(); - else - group = props.getProperty("group"); - MRConsumerImpl sub = null; - if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { - sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), - group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), - props.getProperty("authKey"), props.getProperty("authDate")); - sub.setAuthKey(props.getProperty("authKey")); - sub.setAuthDate(props.getProperty("authDate")); - sub.setUsername(props.getProperty("username")); - sub.setPassword(props.getProperty("password")); - } else { - sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), - group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), - props.getProperty("username"), props.getProperty("password")); - sub.setUsername(props.getProperty("username")); - sub.setPassword(props.getProperty("password")); - } - - sub.setProps(props); - sub.setHost(props.getProperty("host")); - sub.setProtocolFlag(props.getProperty("TransportType")); - sub.setfFilter(props.getProperty("filter")); - if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { - sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath")); - routeFilePath = props.getProperty("DME2preferredRouterFilePath"); - routeReader = new FileReader(new File(routeFilePath)); - prop = new Properties(); - File fo = new File(routeFilePath); - if (!fo.exists()) { - routeWriter = new FileWriter(new File(routeFilePath)); - } - } - - return sub; - } + private static final String AUTH_KEY = "authKey"; + private static final String AUTH_DATE = "authDate"; + private static final String PASSWORD = "password"; + private static final String USERNAME = "username"; + private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath"; + private static final String TOPIC = "topic"; + private static final String TRANSPORT_TYPE = "TransportType"; + public static MultivaluedMap<String, Object> HTTPHeadersMap; + public static Map<String, String> DME2HeadersMap; + public static String routeFilePath; + + public static FileReader routeReader; + + public static FileWriter routeWriter = null; + public static Properties prop = null; + + /** + * Instantiates MRClientFactory. + */ + private MRClientFactory() { + //prevents instantiation. + } + + /** + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. + * + * @param hostList + * A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default). For example, + * "hostname:8080," + * + * @param topic + * The topic to consume + * + * @return a consumer + */ + public static MRConsumer createConsumer(String hostList, String topic) { + return createConsumer(MRConsumerImpl.stringToList(hostList), topic); + } + + /** + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, String topic) { + return createConsumer(hostSet, topic, null); + } + + /** + * Create a consumer instance with server-side filtering, the default + * timeout, and no limit on messages returned. This consumer operates as an + * independent consumer (i.e., not in a group) and is NOT re-startable + * across sessions. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param filter + * a filter to use on the server side + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) { + return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostList + * A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default). For example, + * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". + * + * @return a consumer + */ + public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit, + filter, apiKey, apiSecret); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + if (MRClientBuilders.sfConsumerMock != null) + return MRClientBuilders.sfConsumerMock; + try { + return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, + apiSecret); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } + + /*************************************************************************/ + /*************************************************************************/ + /*************************************************************************/ + + /** + * Create a publisher that sends each message (or group of messages) + * immediately. Most applications should favor higher latency for much + * higher message throughput and the "simple publisher" is not a good + * choice. + * + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @return a publisher + */ + public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) { + return createBatchingPublisher(hostlist, topic, 1, 1); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. Message payloads are + * not compressed. + * + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, + long maxAgeMs) { + return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, + long maxAgeMs, boolean compress) { + return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize, + long maxAgeMs, boolean compress) { + final TreeSet<String> hosts = new TreeSet<String>(); + for (String hp : hostSet) { + hosts.add(hp); + } + return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic, + int maxBatchSize, long maxAgeMs, boolean compress) { + return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) + .compress(compress).build(); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param host + * A host to be used in the URL to MR. Can be "host:port". Use + * multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param username + * username + * @param password + * password + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * @param protocolFlag + * http auth or ueb auth or dme2 method + * @param producerFilePath + * all properties for publisher + * @return MRBatchingPublisher obj + */ + public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username, + final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) { + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder() + .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) + .compress(compress).build(); + + pub.setHost(host); + pub.setUsername(username); + pub.setPassword(password); + pub.setProtocolFlag(protocolFlag); + return pub; + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param Properties + * props set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse) + throws FileNotFoundException, IOException { + return createInternalBatchingPublisher(props, withResponse); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param Properties + * props set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(Properties props) + throws FileNotFoundException, IOException { + return createInternalBatchingPublisher(props, false); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param producerFilePath + * set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath) + throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(producerFilePath)); + Properties props = new Properties(); + props.load(reader); + return createBatchingPublisher(props); + } + + /** + * Create a publisher that will contain send methods that return response + * object to user. + * + * @param producerFilePath + * set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse) + throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(producerFilePath)); + Properties props = new Properties(); + props.load(reader); + return createBatchingPublisher(props, withResponse); + } + + protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse) + throws FileNotFoundException, IOException { + assert props != null; + MRSimplerBatchPublisher pub; + if (withResponse) { + pub = new MRSimplerBatchPublisher.Builder() + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) + .onTopic(props.getProperty(TOPIC)) + .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), + Integer.parseInt(props.getProperty("maxAgeMs").toString())) + .compress(Boolean.parseBoolean(props.getProperty("compress"))) + .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))) + .withResponse(withResponse).build(); + } else { + pub = new MRSimplerBatchPublisher.Builder() + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) + .onTopic(props.getProperty(TOPIC)) + .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), + Integer.parseInt(props.getProperty("maxAgeMs").toString())) + .compress(Boolean.parseBoolean(props.getProperty("compress"))) + .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build(); + } + pub.setHost(props.getProperty("host")); + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + + pub.setAuthKey(props.getProperty(AUTH_KEY)); + pub.setAuthDate(props.getProperty(AUTH_DATE)); + pub.setUsername(props.getProperty(USERNAME)); + pub.setPassword(props.getProperty(PASSWORD)); + } else { + pub.setUsername(props.getProperty(USERNAME)); + pub.setPassword(props.getProperty(PASSWORD)); + } + pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE)); + pub.setProps(props); + prop = new Properties(); + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { + routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH); + routeReader = new FileReader(new File(routeFilePath)); + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); + } + } + return pub; + } + + /** + * Create an identity manager client to work with API keys. + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret + * @return an identity manager + */ + public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) { + MRIdentityManager cim; + try { + cim = new MRMetaClient(hostSet); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + cim.setApiCredentials(apiKey, apiSecret); + return cim; + } + + /** + * Create a topic manager for working with topics. + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret + * @return a topic manager + */ + public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) { + MRMetaClient tmi; + try { + tmi = new MRMetaClient(hostSet); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + tmi.setApiCredentials(apiKey, apiSecret); + return tmi; + } + + /** + * Inject a consumer. Used to support unit tests. + * + * @param cc + */ + public static void $testInject(MRConsumer cc) { + MRClientBuilders.sfConsumerMock = cc; + } + + public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, + String id, int i, int j, String protocalFlag, String consumerFilePath) { + + MRConsumerImpl sub; + try { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + sub.setUsername(username); + sub.setPassword(password); + sub.setHost(host); + sub.setProtocolFlag(protocalFlag); + sub.setConsumerFilePath(consumerFilePath); + return sub; + + } + + public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, + String id, String protocalFlag, String consumerFilePath, int i, int j) { + + MRConsumerImpl sub; + try { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + sub.setUsername(username); + sub.setPassword(password); + sub.setHost(host); + sub.setProtocolFlag(protocalFlag); + sub.setConsumerFilePath(consumerFilePath); + return sub; + + } + + public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(consumerFilePath)); + Properties props = new Properties(); + props.load(reader); + + return createConsumer(props); + } + + public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException { + int timeout; + ValidatorUtil.validateSubscriber(props); + if (props.getProperty("timeout") != null) + timeout = Integer.parseInt(props.getProperty("timeout")); + else + timeout = -1; + int limit; + if (props.getProperty("limit") != null) + limit = Integer.parseInt(props.getProperty("limit")); + else + limit = -1; + String group; + if (props.getProperty("group") == null) + group = UUID.randomUUID().toString(); + else + group = props.getProperty("group"); + MRConsumerImpl sub = null; + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC), + group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), + props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE)); + sub.setAuthKey(props.getProperty(AUTH_KEY)); + sub.setAuthDate(props.getProperty(AUTH_DATE)); + sub.setUsername(props.getProperty(USERNAME)); + sub.setPassword(props.getProperty(PASSWORD)); + } else { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC), + group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), + props.getProperty(USERNAME), props.getProperty(PASSWORD)); + sub.setUsername(props.getProperty(USERNAME)); + sub.setPassword(props.getProperty(PASSWORD)); + } + + sub.setProps(props); + sub.setHost(props.getProperty("host")); + sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE)); + sub.setfFilter(props.getProperty("filter")); + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { + MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH)); + routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH); + routeReader = new FileReader(new File(routeFilePath)); + prop = new Properties(); + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); + } + } + + return sub; + } } |