diff options
Diffstat (limited to 'src/main/java')
6 files changed, 1052 insertions, 975 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java b/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java index 76df039..73ef5c4 100644 --- a/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java +++ b/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java @@ -4,6 +4,8 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2018 IBM. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -40,344 +42,408 @@ import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; */ public class MRClientBuilders { - /** - * A builder for a topic Consumer - * @author author - */ - public static class ConsumerBuilder - { - /** - * Construct a consumer builder. - */ - public ConsumerBuilder () {} - - /** - * Set the host list - * @param hostList a comma-separated list of hosts to use to connect to MR - * @return this builder - */ - public ConsumerBuilder usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); } - - /** - * Set the host list - * @param hostSet a set of hosts to use to connect to MR - * @return this builder - */ - public ConsumerBuilder usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; } - - /** - * Set the topic - * @param topic the name of the topic to consume - * @return this builder - */ - public ConsumerBuilder onTopic ( String topic ) { fTopic=topic; return this; } - - /** - * Set the consumer's group and ID - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consumer in its group - * @return this builder - */ - public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { fGroup = consumerGroup; fId = consumerId; return this; } - - /** - * Set the API key and secret for this client. - * @param apiKey - * @param apiSecret - * @return this builder - */ - public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; } - - /** - * Set the server side timeout - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. - * @return this builder - */ - public ConsumerBuilder waitAtServer ( int timeoutMs ) { fTimeoutMs = timeoutMs; return this; }; - - /** - * Set the maximum number of messages to receive per transaction - * @param limit The maximum number of messages to receive from the server in one transaction. - * @return this builder - */ - public ConsumerBuilder receivingAtMost ( int limit ) { fLimit = limit; return this; }; - - /** - * Set a filter to use on the server - * @param filter a Highland Park standard library filter encoded in JSON - * @return this builder - */ - public ConsumerBuilder withServerSideFilter ( String filter ) { fFilter = filter; return this; } - - /** - * Build the consumer - * @return a consumer - */ - public MRConsumer build () - { - if ( fHosts == null || fHosts.size() == 0 || fTopic == null ) - { - throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); - } - - if ( fGroup == null ) - { - fGroup = UUID.randomUUID ().toString (); - fId = "0"; - log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." ); - } - - if ( sfConsumerMock != null ) return sfConsumerMock; - try { - return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret ); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - } - - private Collection<String> fHosts = null; - private String fTopic = null; - private String fGroup = null; - private String fId = null; - private String fApiKey = null; - private String fApiSecret = null; - private int fTimeoutMs = -1; - private int fLimit = -1; - private String fFilter = null; - } - - /*************************************************************************/ - /*************************************************************************/ - /*************************************************************************/ - - /** - * A publisher builder - * @author author - */ - public static class PublisherBuilder - { - public PublisherBuilder () {} - - /** - * Set the MR/UEB host(s) to use - * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @return this builder - */ - public PublisherBuilder usingHosts ( String hostlist ) { return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); } - - /** - * Set the MR/UEB host(s) to use - * @param hostSet The host(s) used in the URL to MR. Can be "host:port" - * @return this builder - */ - public PublisherBuilder usingHosts ( String[] hostSet ) - { - final TreeSet<String> hosts = new TreeSet<String> (); - for ( String hp : hostSet ) - { - hosts.add ( hp ); - } - return usingHosts ( hosts ); - } - - /** - * Set the MR/UEB host(s) to use - * @param hostlist The host(s) used in the URL to MR. Can be "host:port". - * @return this builder - */ - public PublisherBuilder usingHosts ( Collection<String> hostlist ) { fHosts=hostlist; return this; } - - /** - * Set the topic to publish on - * @param topic The topic on which to publish messages. - * @return this builder - */ - public PublisherBuilder onTopic ( String topic ) { fTopic = topic; return this; } - - /** - * Batch message sends with the given limits. - * @param messageCount The largest set of messages to batch. - * @param ageInMs The maximum age of a message waiting in a batch. - * @return this builder - */ - public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { fMaxBatchSize = messageCount; fMaxBatchAgeMs = ageInMs; return this; } - - /** - * Compress transactions - * @return this builder - */ - public PublisherBuilder withCompresion () { return enableCompresion(true); } - - /** - * Do not compress transactions - * @return this builder - */ - public PublisherBuilder withoutCompresion () { return enableCompresion(false); } - - /** - * Set the compression option - * @param compress true to gzip compress transactions - * @return this builder - */ - public PublisherBuilder enableCompresion ( boolean compress ) { fCompress = compress; return this; } - - /** - * Set the API key and secret for this client. - * @param apiKey - * @param apiSecret - * @return this builder - */ - public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; } - - /** - * Build the publisher - * @return a batching publisher - */ - public MRBatchingPublisher build () - { - if ( fHosts == null || fHosts.isEmpty() || fTopic == null ) - { - throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); - } - - if ( sfPublisherMock != null ) return sfPublisherMock; - - final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls ( fHosts ). - onTopic ( fTopic ). - batchTo ( fMaxBatchSize, fMaxBatchAgeMs ). - compress ( fCompress ). - build (); - if ( fApiKey != null ) - { - pub.setApiCredentials ( fApiKey, fApiSecret ); - } - return pub; - } - - private Collection<String> fHosts = null; - private String fTopic = null; - private int fMaxBatchSize = 1; - private int fMaxBatchAgeMs = 1; - private boolean fCompress = false; - private String fApiKey = null; - private String fApiSecret = null; - } - - /** - * A builder for an identity manager - * @author author - */ - public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager> - { - /** - * Construct an identity manager builder. - */ - public IdentityManagerBuilder () {} - - @Override - protected MRIdentityManager constructClient ( Collection<String> hosts ) { try { - return new MRMetaClient ( hosts ); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } } - } - - /** - * A builder for a topic manager - * @author author - */ - public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager> - { - /** - * Construct an topic manager builder. - */ - public TopicManagerBuilder () {} - - @Override - protected MRTopicManager constructClient ( Collection<String> hosts ) { try { - return new MRMetaClient ( hosts ); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } } - } - - /** - * Inject a consumer. Used to support unit tests. - * @param cc - */ - public static void $testInject ( MRConsumer cc ) - { - sfConsumerMock = cc; - } - - /** - * Inject a publisher. Used to support unit tests. - * @param pub - */ - public static void $testInject ( MRBatchingPublisher pub ) - { - sfPublisherMock = pub; - } - - static MRConsumer sfConsumerMock = null; - static MRBatchingPublisher sfPublisherMock = null; - - /** - * A builder for an identity manager - * @author author - */ - public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient> - { - /** - * Construct an identity manager builder. - */ - public AbstractAuthenticatedManagerBuilder () {} - - /** - * Set the host list - * @param hostList a comma-separated list of hosts to use to connect to MR - * @return this builder - */ - public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); } - - /** - * Set the host list - * @param hostSet a set of hosts to use to connect to MR - * @return this builder - */ - public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; } - - /** - * Set the API key and secret for this client. - * @param apiKey - * @param apiSecret - * @return this builder - */ - public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; } - - /** - * Build the consumer - * @return a consumer - */ - public T build () - { - if ( fHosts.isEmpty() ) - { - throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); - } - - final T mgr = constructClient ( fHosts ); - mgr.setApiCredentials ( fApiKey, fApiSecret ); - return mgr; - } - - protected abstract T constructClient ( Collection<String> hosts ); - - private Collection<String> fHosts = null; - private String fApiKey = null; - private String fApiSecret = null; - } - - private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class ); + + /** + * Instantiates MRClientBuilders. + */ + private MRClientBuilders() { + // prevent instantiation + } + + /** + * A builder for a topic Consumer + * @author author + */ + public static class ConsumerBuilder + { + /** + * Construct a consumer builder. + */ + public ConsumerBuilder () {} + + /** + * Set the host list + * @param hostList a comma-separated list of hosts to use to connect to MR + * @return this builder + */ + public ConsumerBuilder usingHosts ( String hostList ) { + return usingHosts ( MRConsumerImpl.stringToList(hostList) ); + } + + /** + * Set the host list + * @param hostSet a set of hosts to use to connect to MR + * @return this builder + */ + public ConsumerBuilder usingHosts ( Collection<String> hostSet ) { + fHosts = hostSet; return this; + } + + /** + * Set the topic + * @param topic the name of the topic to consume + * @return this builder + */ + public ConsumerBuilder onTopic ( String topic ) { + fTopic=topic; + return this; + } + + /** + * Set the consumer's group and ID + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consumer in its group + * @return this builder + */ + public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { + fGroup = consumerGroup; + fId = consumerId; + return this; + } + + /** + * Set the API key and secret for this client. + * @param apiKey + * @param apiSecret + * @return this builder + */ + public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { + fApiKey = apiKey; + fApiSecret = apiSecret; + return this; + } + + /** + * Set the server side timeout + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. + * @return this builder + */ + public ConsumerBuilder waitAtServer ( int timeoutMs ) { + fTimeoutMs = timeoutMs; + return this; + }; + + /** + * Set the maximum number of messages to receive per transaction + * @param limit The maximum number of messages to receive from the server in one transaction. + * @return this builder + */ + public ConsumerBuilder receivingAtMost ( int limit ) { + fLimit = limit; + return this; + }; + + /** + * Set a filter to use on the server + * @param filter a Highland Park standard library filter encoded in JSON + * @return this builder + */ + public ConsumerBuilder withServerSideFilter ( String filter ) { + fFilter = filter; + return this; + } + + /** + * Build the consumer + * @return a consumer + */ + public MRConsumer build () + { + if ( fHosts == null || fHosts.size() == 0 || fTopic == null ) + { + throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); + } + + if ( fGroup == null ) + { + fGroup = UUID.randomUUID ().toString (); + fId = "0"; + log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." ); + } + + if ( sfConsumerMock != null ) return sfConsumerMock; + try { + return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret ); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } + + private Collection<String> fHosts = null; + private String fTopic = null; + private String fGroup = null; + private String fId = null; + private String fApiKey = null; + private String fApiSecret = null; + private int fTimeoutMs = -1; + private int fLimit = -1; + private String fFilter = null; + } + + /*************************************************************************/ + /*************************************************************************/ + /*************************************************************************/ + + /** + * A publisher builder + * @author author + */ + public static class PublisherBuilder + { + public PublisherBuilder () {} + + /** + * Set the MR/UEB host(s) to use + * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. + * @return this builder + */ + public PublisherBuilder usingHosts ( String hostlist ) { + return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); + } + + /** + * Set the MR/UEB host(s) to use + * @param hostSet The host(s) used in the URL to MR. Can be "host:port" + * @return this builder + */ + public PublisherBuilder usingHosts ( String[] hostSet ) + { + final TreeSet<String> hosts = new TreeSet<String> (); + for ( String hp : hostSet ) + { + hosts.add ( hp ); + } + return usingHosts ( hosts ); + } + + /** + * Set the MR/UEB host(s) to use + * @param hostlist The host(s) used in the URL to MR. Can be "host:port". + * @return this builder + */ + public PublisherBuilder usingHosts ( Collection<String> hostlist ) { + fHosts=hostlist; + return this; + } + + /** + * Set the topic to publish on + * @param topic The topic on which to publish messages. + * @return this builder + */ + public PublisherBuilder onTopic ( String topic ) { + fTopic = topic; + return this; + } + + /** + * Batch message sends with the given limits. + * @param messageCount The largest set of messages to batch. + * @param ageInMs The maximum age of a message waiting in a batch. + * @return this builder + */ + public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { + fMaxBatchSize = messageCount; + fMaxBatchAgeMs = ageInMs; + return this; + } + + /** + * Compress transactions + * @return this builder + */ + public PublisherBuilder withCompresion () { + return enableCompresion(true); + } + + /** + * Do not compress transactions + * @return this builder + */ + public PublisherBuilder withoutCompresion () { + return enableCompresion(false); + } + + /** + * Set the compression option + * @param compress true to gzip compress transactions + * @return this builder + */ + public PublisherBuilder enableCompresion ( boolean compress ) { + fCompress = compress; + return this; + } + + /** + * Set the API key and secret for this client. + * @param apiKey + * @param apiSecret + * @return this builder + */ + public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { + fApiKey = apiKey; + fApiSecret = apiSecret; + return this; + } + + /** + * Build the publisher + * @return a batching publisher + */ + public MRBatchingPublisher build () + { + if ( fHosts == null || fHosts.isEmpty() || fTopic == null ) + { + throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); + } + + if ( sfPublisherMock != null ) return sfPublisherMock; + + final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). + againstUrls ( fHosts ). + onTopic ( fTopic ). + batchTo ( fMaxBatchSize, fMaxBatchAgeMs ). + compress ( fCompress ). + build (); + if ( fApiKey != null ) + { + pub.setApiCredentials ( fApiKey, fApiSecret ); + } + return pub; + } + + private Collection<String> fHosts = null; + private String fTopic = null; + private int fMaxBatchSize = 1; + private int fMaxBatchAgeMs = 1; + private boolean fCompress = false; + private String fApiKey = null; + private String fApiSecret = null; + } + + /** + * A builder for an identity manager + * @author author + */ + public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager> + { + /** + * Construct an identity manager builder. + */ + public IdentityManagerBuilder () {} + + @Override + protected MRIdentityManager constructClient ( Collection<String> hosts ) { try { + return new MRMetaClient ( hosts ); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } } + } + + /** + * A builder for a topic manager + * @author author + */ + public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager> + { + /** + * Construct an topic manager builder. + */ + public TopicManagerBuilder () {} + + @Override + protected MRTopicManager constructClient ( Collection<String> hosts ) { try { + return new MRMetaClient ( hosts ); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } } + } + + /** + * Inject a consumer. Used to support unit tests. + * @param cc + */ + public static void $testInject ( MRConsumer cc ) + { + sfConsumerMock = cc; + } + + /** + * Inject a publisher. Used to support unit tests. + * @param pub + */ + public static void $testInject ( MRBatchingPublisher pub ) + { + sfPublisherMock = pub; + } + + static MRConsumer sfConsumerMock = null; + static MRBatchingPublisher sfPublisherMock = null; + + /** + * A builder for an identity manager + * @author author + */ + public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient> + { + /** + * Construct an identity manager builder. + */ + public AbstractAuthenticatedManagerBuilder () {} + + /** + * Set the host list + * @param hostList a comma-separated list of hosts to use to connect to MR + * @return this builder + */ + public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) { + return usingHosts ( MRConsumerImpl.stringToList(hostList) ); + } + + /** + * Set the host list + * @param hostSet a set of hosts to use to connect to MR + * @return this builder + */ + public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) { + fHosts = hostSet; + return this; + } + + /** + * Set the API key and secret for this client. + * @param apiKey + * @param apiSecret + * @return this builder + */ + public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) { + fApiKey = apiKey; + fApiSecret = apiSecret; + return this; + } + + /** + * Build the consumer + * @return a consumer + */ + public T build () + { + if ( fHosts.isEmpty() ) + { + throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); + } + + final T mgr = constructClient ( fHosts ); + mgr.setApiCredentials ( fApiKey, fApiSecret ); + return mgr; + } + + protected abstract T constructClient ( Collection<String> hosts ); + + private Collection<String> fHosts = null; + private String fApiKey = null; + private String fApiSecret = null; + } + + private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class ); } diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java index 97aa0b8..689190e 100644 --- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java +++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java @@ -4,6 +4,8 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2018 IBM. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -58,617 +60,632 @@ import com.att.nsa.mr.tools.ValidatorUtil; * @author author */ public class MRClientFactory { - public static MultivaluedMap<String, Object> HTTPHeadersMap; - public static Map<String, String> DME2HeadersMap; - public static String routeFilePath; - - public static FileReader routeReader; - - public static FileWriter routeWriter = null; - public static Properties prop = null; - - - // props= new Properties(); - /** - * Create a consumer instance with the default timeout and no limit on - * messages returned. This consumer operates as an independent consumer - * (i.e., not in a group) and is NOT re-startable across sessions. - * - * @param hostList - * A comma separated list of hosts to use to connect to MR. You - * can include port numbers (3904 is the default). For example, - * "hostname:8080," - * - * @param topic - * The topic to consume - * - * @return a consumer - */ - public static MRConsumer createConsumer(String hostList, String topic) { - return createConsumer(MRConsumerImpl.stringToList(hostList), topic); - } - - /** - * Create a consumer instance with the default timeout and no limit on - * messages returned. This consumer operates as an independent consumer - * (i.e., not in a group) and is NOT re-startable across sessions. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, String topic) { - return createConsumer(hostSet, topic, null); - } - - /** - * Create a consumer instance with server-side filtering, the default - * timeout, and no limit on messages returned. This consumer operates as an - * independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param filter - * a filter to use on the server side - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) { - return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId) { - return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit) { - return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. This consumer also uses server-side filtering. - * - * @param hostList - * A comma separated list of hosts to use to connect to MR. You - * can include port numbers (3904 is the default). For example, - * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * @param filter - * A Highland Park filter expression using only built-in filter - * components. Use null for "no filter". - * - * @return a consumer - */ - public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { - return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit, - filter, apiKey, apiSecret); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. This consumer also uses server-side filtering. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * @param filter - * A Highland Park filter expression using only built-in filter - * components. Use null for "no filter". - * - * @return a consumer - */ - public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { - if (MRClientBuilders.sfConsumerMock != null) - return MRClientBuilders.sfConsumerMock; - try { - return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, - apiSecret); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - } - - /*************************************************************************/ - /*************************************************************************/ - /*************************************************************************/ - - /** - * Create a publisher that sends each message (or group of messages) - * immediately. Most applications should favor higher latency for much - * higher message throughput and the "simple publisher" is not a good - * choice. - * - * @param hostlist - * The host used in the URL to MR. Can be "host:port", can be - * multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @return a publisher - */ - public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) { - return createBatchingPublisher(hostlist, topic, 1, 1); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. Message payloads are - * not compressed. - * - * @param hostlist - * The host used in the URL to MR. Can be "host:port", can be - * multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, - long maxAgeMs) { - return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param hostlist - * The host used in the URL to MR. Can be "host:port", can be - * multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, - long maxAgeMs, boolean compress) { - return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize, - long maxAgeMs, boolean compress) { - final TreeSet<String> hosts = new TreeSet<String>(); - for (String hp : hostSet) { - hosts.add(hp); - } - return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic, - int maxBatchSize, long maxAgeMs, boolean compress) { - return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) - .compress(compress).build(); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param host - * A host to be used in the URL to MR. Can be "host:port". Use - * multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param username - * username - * @param password - * password - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * @param protocolFlag - * http auth or ueb auth or dme2 method - * @param producerFilePath - * all properties for publisher - * @return MRBatchingPublisher obj - */ - public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username, - final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, - String producerFilePath) { - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder() - .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) - .compress(compress).build(); - - pub.setHost(host); - pub.setUsername(username); - pub.setPassword(password); - pub.setProtocolFlag(protocolFlag); - return pub; - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown - * - * @param Properties - * props set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex - */ - public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse) - throws FileNotFoundException, IOException { - return createInternalBatchingPublisher(props, withResponse); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown - * - * @param Properties - * props set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex - */ - public static MRBatchingPublisher createBatchingPublisher(Properties props) - throws FileNotFoundException, IOException { - return createInternalBatchingPublisher(props, false); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown - * - * @param producerFilePath - * set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex - */ - public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath) - throws FileNotFoundException, IOException { - FileReader reader = new FileReader(new File(producerFilePath)); - Properties props = new Properties(); - props.load(reader); - return createBatchingPublisher(props); - } - - /** - * Create a publisher that will contain send methods that return response - * object to user. - * - * @param producerFilePath - * set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex - */ - public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse) - throws FileNotFoundException, IOException { - FileReader reader = new FileReader(new File(producerFilePath)); - Properties props = new Properties(); - props.load(reader); - return createBatchingPublisher(props, withResponse); - } - - protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse) - throws FileNotFoundException, IOException { - assert props != null; - MRSimplerBatchPublisher pub; - if (withResponse) { - pub = new MRSimplerBatchPublisher.Builder() - .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType")) - .onTopic(props.getProperty("topic")) - .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), - Integer.parseInt(props.getProperty("maxAgeMs").toString())) - .compress(Boolean.parseBoolean(props.getProperty("compress"))) - .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))) - .withResponse(withResponse).build(); - } else { - pub = new MRSimplerBatchPublisher.Builder() - .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType")) - .onTopic(props.getProperty("topic")) - .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), - Integer.parseInt(props.getProperty("maxAgeMs").toString())) - .compress(Boolean.parseBoolean(props.getProperty("compress"))) - .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build(); - } - pub.setHost(props.getProperty("host")); - if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { - - pub.setAuthKey(props.getProperty("authKey")); - pub.setAuthDate(props.getProperty("authDate")); - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - } else { - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - } - pub.setProtocolFlag(props.getProperty("TransportType")); - pub.setProps(props); - prop = new Properties(); - if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { - routeFilePath = props.getProperty("DME2preferredRouterFilePath"); - routeReader = new FileReader(new File(routeFilePath)); - File fo = new File(routeFilePath); - if (!fo.exists()) { - routeWriter = new FileWriter(new File(routeFilePath)); - } - } - return pub; - } - - /** - * Create an identity manager client to work with API keys. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param apiKey - * Your API key - * @param apiSecret - * Your API secret - * @return an identity manager - */ - public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) { - MRIdentityManager cim; - try { - cim = new MRMetaClient(hostSet); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - cim.setApiCredentials(apiKey, apiSecret); - return cim; - } - - /** - * Create a topic manager for working with topics. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param apiKey - * Your API key - * @param apiSecret - * Your API secret - * @return a topic manager - */ - public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) { - MRMetaClient tmi; - try { - tmi = new MRMetaClient(hostSet); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - tmi.setApiCredentials(apiKey, apiSecret); - return tmi; - } - - /** - * Inject a consumer. Used to support unit tests. - * - * @param cc - */ - public static void $testInject(MRConsumer cc) { - MRClientBuilders.sfConsumerMock = cc; - } - - public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, - String id, int i, int j, String protocalFlag, String consumerFilePath) { - - MRConsumerImpl sub; - try { - sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - sub.setUsername(username); - sub.setPassword(password); - sub.setHost(host); - sub.setProtocolFlag(protocalFlag); - sub.setConsumerFilePath(consumerFilePath); - return sub; - - } - - public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, - String id, String protocalFlag, String consumerFilePath, int i, int j) { - - MRConsumerImpl sub; - try { - sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - sub.setUsername(username); - sub.setPassword(password); - sub.setHost(host); - sub.setProtocolFlag(protocalFlag); - sub.setConsumerFilePath(consumerFilePath); - return sub; - - } - - public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException { - FileReader reader = new FileReader(new File(consumerFilePath)); - Properties props = new Properties(); - props.load(reader); - - return createConsumer(props); - } - - public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException { - int timeout; - ValidatorUtil.validateSubscriber(props); - if (props.getProperty("timeout") != null) - timeout = Integer.parseInt(props.getProperty("timeout")); - else - timeout = -1; - int limit; - if (props.getProperty("limit") != null) - limit = Integer.parseInt(props.getProperty("limit")); - else - limit = -1; - String group; - if (props.getProperty("group") == null) - group = UUID.randomUUID().toString(); - else - group = props.getProperty("group"); - MRConsumerImpl sub = null; - if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { - sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), - group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), - props.getProperty("authKey"), props.getProperty("authDate")); - sub.setAuthKey(props.getProperty("authKey")); - sub.setAuthDate(props.getProperty("authDate")); - sub.setUsername(props.getProperty("username")); - sub.setPassword(props.getProperty("password")); - } else { - sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), - group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), - props.getProperty("username"), props.getProperty("password")); - sub.setUsername(props.getProperty("username")); - sub.setPassword(props.getProperty("password")); - } - sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath")); - sub.setProps(props); - sub.setHost(props.getProperty("host")); - sub.setProtocolFlag(props.getProperty("TransportType")); - sub.setfFilter(props.getProperty("filter")); - routeFilePath = props.getProperty("DME2preferredRouterFilePath"); - routeReader = new FileReader(new File(routeFilePath)); - prop = new Properties(); - File fo = new File(routeFilePath); - if (!fo.exists()) { - routeWriter = new FileWriter(new File(routeFilePath)); - } - return sub; - } + private static final String AUTH_KEY = "authKey"; + private static final String AUTH_DATE = "authDate"; + private static final String PASSWORD = "password"; + private static final String USERNAME = "username"; + private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath"; + private static final String TOPIC = "topic"; + private static final String TRANSPORT_TYPE = "TransportType"; + public static MultivaluedMap<String, Object> HTTPHeadersMap; + public static Map<String, String> DME2HeadersMap; + public static String routeFilePath; + + public static FileReader routeReader; + + public static FileWriter routeWriter = null; + public static Properties prop = null; + + /** + * Instantiates MRClientFactory. + */ + private MRClientFactory() { + //prevents instantiation. + } + + /** + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. + * + * @param hostList + * A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default). For example, + * "hostname:8080," + * + * @param topic + * The topic to consume + * + * @return a consumer + */ + public static MRConsumer createConsumer(String hostList, String topic) { + return createConsumer(MRConsumerImpl.stringToList(hostList), topic); + } + + /** + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, String topic) { + return createConsumer(hostSet, topic, null); + } + + /** + * Create a consumer instance with server-side filtering, the default + * timeout, and no limit on messages returned. This consumer operates as an + * independent consumer (i.e., not in a group) and is NOT re-startable + * across sessions. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param filter + * a filter to use on the server side + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) { + return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostList + * A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default). For example, + * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". + * + * @return a consumer + */ + public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit, + filter, apiKey, apiSecret); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". + * + * @return a consumer + */ + public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + if (MRClientBuilders.sfConsumerMock != null) + return MRClientBuilders.sfConsumerMock; + try { + return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, + apiSecret); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } + + /*************************************************************************/ + /*************************************************************************/ + /*************************************************************************/ + + /** + * Create a publisher that sends each message (or group of messages) + * immediately. Most applications should favor higher latency for much + * higher message throughput and the "simple publisher" is not a good + * choice. + * + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @return a publisher + */ + public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) { + return createBatchingPublisher(hostlist, topic, 1, 1); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. Message payloads are + * not compressed. + * + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, + long maxAgeMs) { + return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, + long maxAgeMs, boolean compress) { + return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize, + long maxAgeMs, boolean compress) { + final TreeSet<String> hosts = new TreeSet<String>(); + for (String hp : hostSet) { + hosts.add(hp); + } + return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic, + int maxBatchSize, long maxAgeMs, boolean compress) { + return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) + .compress(compress).build(); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param host + * A host to be used in the URL to MR. Can be "host:port". Use + * multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param username + * username + * @param password + * password + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * @param protocolFlag + * http auth or ueb auth or dme2 method + * @param producerFilePath + * all properties for publisher + * @return MRBatchingPublisher obj + */ + public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username, + final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) { + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder() + .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) + .compress(compress).build(); + + pub.setHost(host); + pub.setUsername(username); + pub.setPassword(password); + pub.setProtocolFlag(protocolFlag); + return pub; + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param Properties + * props set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse) + throws FileNotFoundException, IOException { + return createInternalBatchingPublisher(props, withResponse); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param Properties + * props set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(Properties props) + throws FileNotFoundException, IOException { + return createInternalBatchingPublisher(props, false); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param producerFilePath + * set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath) + throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(producerFilePath)); + Properties props = new Properties(); + props.load(reader); + return createBatchingPublisher(props); + } + + /** + * Create a publisher that will contain send methods that return response + * object to user. + * + * @param producerFilePath + * set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse) + throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(producerFilePath)); + Properties props = new Properties(); + props.load(reader); + return createBatchingPublisher(props, withResponse); + } + + protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse) + throws FileNotFoundException, IOException { + assert props != null; + MRSimplerBatchPublisher pub; + if (withResponse) { + pub = new MRSimplerBatchPublisher.Builder() + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) + .onTopic(props.getProperty(TOPIC)) + .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), + Integer.parseInt(props.getProperty("maxAgeMs").toString())) + .compress(Boolean.parseBoolean(props.getProperty("compress"))) + .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))) + .withResponse(withResponse).build(); + } else { + pub = new MRSimplerBatchPublisher.Builder() + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) + .onTopic(props.getProperty(TOPIC)) + .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), + Integer.parseInt(props.getProperty("maxAgeMs").toString())) + .compress(Boolean.parseBoolean(props.getProperty("compress"))) + .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build(); + } + pub.setHost(props.getProperty("host")); + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + + pub.setAuthKey(props.getProperty(AUTH_KEY)); + pub.setAuthDate(props.getProperty(AUTH_DATE)); + pub.setUsername(props.getProperty(USERNAME)); + pub.setPassword(props.getProperty(PASSWORD)); + } else { + pub.setUsername(props.getProperty(USERNAME)); + pub.setPassword(props.getProperty(PASSWORD)); + } + pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE)); + pub.setProps(props); + prop = new Properties(); + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { + routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH); + routeReader = new FileReader(new File(routeFilePath)); + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); + } + } + return pub; + } + + /** + * Create an identity manager client to work with API keys. + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret + * @return an identity manager + */ + public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) { + MRIdentityManager cim; + try { + cim = new MRMetaClient(hostSet); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + cim.setApiCredentials(apiKey, apiSecret); + return cim; + } + + /** + * Create a topic manager for working with topics. + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret + * @return a topic manager + */ + public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) { + MRMetaClient tmi; + try { + tmi = new MRMetaClient(hostSet); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + tmi.setApiCredentials(apiKey, apiSecret); + return tmi; + } + + /** + * Inject a consumer. Used to support unit tests. + * + * @param cc + */ + public static void $testInject(MRConsumer cc) { + MRClientBuilders.sfConsumerMock = cc; + } + + public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, + String id, int i, int j, String protocalFlag, String consumerFilePath) { + + MRConsumerImpl sub; + try { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + sub.setUsername(username); + sub.setPassword(password); + sub.setHost(host); + sub.setProtocolFlag(protocalFlag); + sub.setConsumerFilePath(consumerFilePath); + return sub; + + } + + public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, + String id, String protocalFlag, String consumerFilePath, int i, int j) { + + MRConsumerImpl sub; + try { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + sub.setUsername(username); + sub.setPassword(password); + sub.setHost(host); + sub.setProtocolFlag(protocalFlag); + sub.setConsumerFilePath(consumerFilePath); + return sub; + + } + + public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(consumerFilePath)); + Properties props = new Properties(); + props.load(reader); + + return createConsumer(props); + } + + public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException { + int timeout; + ValidatorUtil.validateSubscriber(props); + if (props.getProperty("timeout") != null) + timeout = Integer.parseInt(props.getProperty("timeout")); + else + timeout = -1; + int limit; + if (props.getProperty("limit") != null) + limit = Integer.parseInt(props.getProperty("limit")); + else + limit = -1; + String group; + if (props.getProperty("group") == null) + group = UUID.randomUUID().toString(); + else + group = props.getProperty("group"); + MRConsumerImpl sub = null; + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC), + group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), + props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE)); + sub.setAuthKey(props.getProperty(AUTH_KEY)); + sub.setAuthDate(props.getProperty(AUTH_DATE)); + sub.setUsername(props.getProperty(USERNAME)); + sub.setPassword(props.getProperty(PASSWORD)); + } else { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC), + group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), + props.getProperty(USERNAME), props.getProperty(PASSWORD)); + sub.setUsername(props.getProperty(USERNAME)); + sub.setPassword(props.getProperty(PASSWORD)); + } + + sub.setProps(props); + sub.setHost(props.getProperty("host")); + sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE)); + sub.setfFilter(props.getProperty("filter")); + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { + MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH)); + routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH); + routeReader = new FileReader(new File(routeFilePath)); + prop = new Properties(); + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); + } + } + + return sub; + } } diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java index fb49096..76bf5ce 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java @@ -261,19 +261,14 @@ public class MRBaseClient extends HttpClient implements MRClient { } } - public JSONObject getNoAuth(final String path, final String username, final String password, - final String protocolFlag) throws HttpException, JSONException { - if (null != username && null != password) { - WebTarget target=null; - Response response=null; - target = DmaapClientUtil.getTarget(path, username, password); - response = DmaapClientUtil.getResponsewtNoAuth(target); + public JSONObject getNoAuth(final String path) throws HttpException, JSONException { - return getResponseDataInJson(response); - } else { - throw new HttpException( - "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); - } + WebTarget target = null; + Response response = null; + target = DmaapClientUtil.getTarget(path); + response = DmaapClientUtil.getResponsewtNoAuth(target); + + return getResponseDataInJson(response); } public String getAuthResponse(final String path, final String authKey, final String authDate, final String username, diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java index 72d97c9..bc156ea 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java @@ -200,7 +200,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); try { - final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag); + final JSONObject o = getNoAuth(urlPath); if (o != null) { final JSONArray a = o.getJSONArray("result"); if (a != null) { diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java index 8509042..4f44d30 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java @@ -864,6 +864,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP send(false); } }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS); + pubResponse = new MRPublisherResponse(); } private static class TimestampedMessage extends message { diff --git a/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java index 900c932..0539582 100644 --- a/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java +++ b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java @@ -64,10 +64,6 @@ public class ValidatorUtil { if (id == null || id.isEmpty()) { throw new IllegalArgumentException ( "Consumer (Id) is needed" ); } - String sessionstickinessrequired = props.getProperty("sessionstickinessrequired"); - if (sessionstickinessrequired == null || sessionstickinessrequired.isEmpty()) { - throw new IllegalArgumentException ( "sessionstickinessrequired is needed" ); - } } private static void validateForDME2(Properties props) { @@ -132,6 +128,10 @@ public class ValidatorUtil { if (subContextPath == null || subContextPath.isEmpty()) { throw new IllegalArgumentException ( "SubContextPath is needed" ); } + String sessionstickinessrequired = props.getProperty("sessionstickinessrequired"); + if (sessionstickinessrequired == null || sessionstickinessrequired.isEmpty()) { + throw new IllegalArgumentException ( "sessionstickinessrequired is needed" ); + } } private static void validateForNonDME2(Properties props) { @@ -144,14 +144,11 @@ public class ValidatorUtil { if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException ( "topic is needed" ); } - String methodType = props.getProperty("MethodType"); - if (methodType == null || methodType.isEmpty()) { - throw new IllegalArgumentException ( "MethodType is needed" ); - } String contenttype = props.getProperty("contenttype"); if (contenttype == null || contenttype.isEmpty()) { throw new IllegalArgumentException ( "contenttype is needed" ); } + if (!ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(transportType)){ String username = props.getProperty("username"); if (username == null || username.isEmpty()) { throw new IllegalArgumentException ( "username is needed" ); @@ -160,6 +157,7 @@ public class ValidatorUtil { if (password == null || password.isEmpty()) { throw new IllegalArgumentException ( "password is needed" ); } + } if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(transportType)) { String authKey = props.getProperty("authKey"); if (authKey == null || authKey.isEmpty()) { @@ -167,7 +165,7 @@ public class ValidatorUtil { } String authDate = props.getProperty("authDate"); if (authDate == null || authDate.isEmpty()) { - throw new IllegalArgumentException ( "password is needed" ); + throw new IllegalArgumentException ( "authDate is needed" ); } } |