diff options
author | sunil.unnava <su622b@att.com> | 2018-01-23 15:26:15 -0500 |
---|---|---|
committer | sunil.unnava <su622b@att.com> | 2018-01-23 15:36:05 -0500 |
commit | 85c21e1d85c545717affd3f18cd8e9fe6dc14562 (patch) | |
tree | 06909dffa1ac3cb95f08aa1dcebfe32708578e0a /src/main | |
parent | 0497d0508a62ff513ff6883c2f6e1947da968d37 (diff) |
Changes to the DMaap Client
Added new API to the DMaapClient
Issue-ID: DMAAP-214
Change-Id: I4de2da7ca42ad1b5925a2df9d26672875dd15b10
Signed-off-by: sunil.unnava <su622b@att.com>
Diffstat (limited to 'src/main')
7 files changed, 1538 insertions, 1436 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java index 59e472c..b654282 100644 --- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java +++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java @@ -43,158 +43,205 @@ import com.att.nsa.mr.test.clients.ProtocolTypeConstants; /** * A factory for MR clients.<br/> * <br/> - * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates - * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive - * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's - * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across - * them. Be sure to use a different ID for each instance.)<br/> + * Use caution selecting a consumer creator factory. If the call doesn't accept + * a consumer group name, then it creates a consumer that is not restartable. + * That is, if you stop your process and start it again, your client will NOT + * receive any missed messages on the topic. If you need to ensure receipt of + * missed messages, then you must use a consumer that's created with a group + * name and ID. (If you create multiple consumer processes using the same group, + * load is split across them. Be sure to use a different ID for each + * instance.)<br/> * <br/> - * Publishers + * Publishers * * @author author */ -public class MRClientFactory -{ +public class MRClientFactory { public static MultivaluedMap<String, Object> HTTPHeadersMap; public static Map<String, String> DME2HeadersMap; public static String routeFilePath; - + public static FileReader routeReader; - - public static FileWriter routeWriter= null; - public static Properties prop=null; - //routeReader= new FileReader(new File (routeFilePath)); - //props= new Properties(); + + public static FileWriter routeWriter = null; + public static Properties prop = null; + + // routeReader= new FileReader(new File (routeFilePath)); + // props= new Properties(); /** - * Create a consumer instance with the default timeout and no limit - * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. * - * @param hostList A comma separated list of hosts to use to connect to MR. - * You can include port numbers (3904 is the default). For example, "hostname:8080," + * @param hostList + * A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default). For example, + * "hostname:8080," * - * @param topic The topic to consume + * @param topic + * The topic to consume * * @return a consumer */ - public static MRConsumer createConsumer ( String hostList, String topic ) - { - return createConsumer ( MRConsumerImpl.stringToList(hostList), topic ); + public static MRConsumer createConsumer(String hostList, String topic) { + return createConsumer(MRConsumerImpl.stringToList(hostList), topic); } /** - * Create a consumer instance with the default timeout and no limit - * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume * * @return a consumer */ - public static MRConsumer createConsumer ( Collection<String> hostSet, String topic ) - { - return createConsumer ( hostSet, topic, null ); + public static MRConsumer createConsumer(Collection<String> hostSet, String topic) { + return createConsumer(hostSet, topic, null); } /** - * Create a consumer instance with server-side filtering, the default timeout, and no limit - * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable + * Create a consumer instance with server-side filtering, the default + * timeout, and no limit on messages returned. This consumer operates as an + * independent consumer (i.e., not in a group) and is NOT re-startable * across sessions. * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param filter a filter to use on the server side + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param filter + * a filter to use on the server side * * @return a consumer */ - public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter ) - { - return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null ); + public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) { + return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null); } /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group * * @return a consumer */ - public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId ) - { - return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 ); + public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1); } /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. * * @return a consumer */ - public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit) - { - return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null ); + public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null); } /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. This consumer also uses - * server-side filtering. - * - * @param hostList A comma separated list of hosts to use to connect to MR. - * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. - * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostList + * A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default). For example, + * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". * * @return a consumer */ - public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) - { - return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup, - consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); + public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit, + filter, apiKey, apiSecret); } /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. This consumer also uses - * server-side filtering. - * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. - * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostSet + * The host used in the URL to MR. Entries can be "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". * * @return a consumer */ - public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) - { - if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock; + public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + if (MRClientBuilders.sfConsumerMock != null) + return MRClientBuilders.sfConsumerMock; try { - return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); + return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, + apiSecret); } catch (MalformedURLException e) { throw new RuntimeException(e); } @@ -203,282 +250,339 @@ public class MRClientFactory /*************************************************************************/ /*************************************************************************/ /*************************************************************************/ - + /** - * Create a publisher that sends each message (or group of messages) immediately. Most - * applications should favor higher latency for much higher message throughput and the - * "simple publisher" is not a good choice. - * - * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @param topic The topic on which to publish messages. + * Create a publisher that sends each message (or group of messages) + * immediately. Most applications should favor higher latency for much + * higher message throughput and the "simple publisher" is not a good + * choice. + * + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. * @return a publisher */ - public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic ) - { - return createBatchingPublisher ( hostlist, topic, 1, 1 ); + public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) { + return createBatchingPublisher(hostlist, topic, 1, 1); } /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. Message payloads are not compressed. + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. Message payloads are + * not compressed. * - * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch * * @return a publisher */ - public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs ) - { - return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false ); + public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, + long maxAgeMs) { + return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false); } /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. * - * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression + * @param hostlist + * The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression * * @return a publisher */ - public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress ); + public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, + long maxAgeMs, boolean compress) { + return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress); } /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. * - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression * * @return a publisher */ - public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - final TreeSet<String> hosts = new TreeSet<String> (); - for ( String hp : hostSet ) - { - hosts.add ( hp ); + public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize, + long maxAgeMs, boolean compress) { + final TreeSet<String> hosts = new TreeSet<String>(); + for (String hp : hostSet) { + hosts.add(hp); } - return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress ); + return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress); } /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. * - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression * * @return a publisher */ - public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - return new MRSimplerBatchPublisher.Builder (). - againstUrls ( hostSet ). - onTopic ( topic ). - batchTo ( maxBatchSize, maxAgeMs ). - compress ( compress ). - build (); + public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic, + int maxBatchSize, long maxAgeMs, boolean compress) { + return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) + .compress(compress).build(); } - + /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. - * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param topic The topic on which to publish messages. - * @param username username - * @param password password - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression - * @param protocolFlag http auth or ueb auth or dme2 method - * @param producerFilePath all properties for publisher + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param host + * A host to be used in the URL to MR. Can be "host:port". Use + * multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param username + * username + * @param password + * password + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * @param protocolFlag + * http auth or ueb auth or dme2 method + * @param producerFilePath + * all properties for publisher * @return MRBatchingPublisher obj */ - public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath ) - { - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls(MRConsumerImpl.stringToList(host)). - onTopic ( topic ). - batchTo ( maxBatchSize, maxAgeMs ). - compress ( compress ). - build (); - + public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username, + final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, + String producerFilePath) { + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder() + .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) + .compress(compress).build(); + pub.setHost(host); pub.setUsername(username); pub.setPassword(password); pub.setProtocolFlag(protocolFlag); - pub.setProducerFilePath(producerFilePath); return pub; } - - + /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown - * @param producerFilePath set all properties for publishing message + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param Properties + * props set all properties for publishing message * @return MRBatchingPublisher obj - * @throws FileNotFoundException exc - * @throws IOException ioex + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex */ - public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException { - FileReader reader = new FileReader(new File (producerFilePath)); - Properties props = new Properties(); + public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse) + throws FileNotFoundException, IOException { + return createInternalBatchingPublisher(props, withResponse); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param Properties + * props set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(Properties props) + throws FileNotFoundException, IOException { + return createInternalBatchingPublisher(props, false); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown + * + * @param producerFilePath + * set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex + */ + public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath) + throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(producerFilePath)); + Properties props = new Properties(); props.load(reader); - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). - onTopic ( props.getProperty("topic") ). - batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). - compress (Boolean.parseBoolean(props.getProperty("compress"))). - httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). - build (); - pub.setHost(props.getProperty("host")); - if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ - - pub.setAuthKey(props.getProperty("authKey")); - pub.setAuthDate(props.getProperty("authDate")); - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - }else{ - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - } - pub.setProducerFilePath(producerFilePath); - pub.setProtocolFlag(props.getProperty("TransportType")); - pub.setProps(props); - routeFilePath=props.getProperty("DME2preferredRouterFilePath"); - routeReader= new FileReader(new File (routeFilePath)); - prop= new Properties(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File(routeFilePath)); - } - //pub.setContentType(contentType); - return pub; + return createBatchingPublisher(props); } - + /** - * Create a publisher that will contain send methods that return - * response object to user. - * @param producerFilePath set all properties for publishing message + * Create a publisher that will contain send methods that return response + * object to user. + * + * @param producerFilePath + * set all properties for publishing message * @return MRBatchingPublisher obj - * @throws FileNotFoundException exc - * @throws IOException ioex + * @throws FileNotFoundException + * exc + * @throws IOException + * ioex */ - public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException { - FileReader reader = new FileReader(new File (producerFilePath)); - Properties props = new Properties(); + public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse) + throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(producerFilePath)); + Properties props = new Properties(); props.load(reader); - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). - onTopic ( props.getProperty("topic") ). - batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). - compress (Boolean.parseBoolean(props.getProperty("compress"))). - httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). - withResponse(withResponse). - build (); + return createBatchingPublisher(props, withResponse); + } + + protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse) + throws FileNotFoundException, IOException { + assert props != null; + MRSimplerBatchPublisher pub; + if (withResponse) { + pub = new MRSimplerBatchPublisher.Builder() + .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))) + .onTopic(props.getProperty("topic")) + .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), + Integer.parseInt(props.getProperty("maxAgeMs").toString())) + .compress(Boolean.parseBoolean(props.getProperty("compress"))) + .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))) + .withResponse(withResponse).build(); + } else { + pub = new MRSimplerBatchPublisher.Builder() + .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))) + .onTopic(props.getProperty("topic")) + .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), + Integer.parseInt(props.getProperty("maxAgeMs").toString())) + .compress(Boolean.parseBoolean(props.getProperty("compress"))) + .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build(); + } pub.setHost(props.getProperty("host")); - if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ - + if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + pub.setAuthKey(props.getProperty("authKey")); pub.setAuthDate(props.getProperty("authDate")); pub.setUsername(props.getProperty("username")); pub.setPassword(props.getProperty("password")); - }else{ + } else { pub.setUsername(props.getProperty("username")); pub.setPassword(props.getProperty("password")); } - pub.setProducerFilePath(producerFilePath); pub.setProtocolFlag(props.getProperty("TransportType")); pub.setProps(props); - routeFilePath=props.getProperty("DME2preferredRouterFilePath"); - routeReader= new FileReader(new File (routeFilePath)); - prop= new Properties(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File(routeFilePath)); + routeFilePath = props.getProperty("DME2preferredRouterFilePath"); + routeReader = new FileReader(new File(routeFilePath)); + prop = new Properties(); + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); } - //pub.setContentType(contentType); return pub; } - - - - - - - - - - /** * Create an identity manager client to work with API keys. - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param apiKey Your API key - * @param apiSecret Your API secret + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret * @return an identity manager */ - public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret ) - { + public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) { MRIdentityManager cim; try { - cim = new MRMetaClient ( hostSet ); + cim = new MRMetaClient(hostSet); } catch (MalformedURLException e) { throw new RuntimeException(e); } - cim.setApiCredentials ( apiKey, apiSecret ); + cim.setApiCredentials(apiKey, apiSecret); return cim; } /** * Create a topic manager for working with topics. - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param apiKey Your API key - * @param apiSecret Your API secret + * + * @param hostSet + * A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret * @return a topic manager */ - public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret ) - { + public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) { MRMetaClient tmi; try { - tmi = new MRMetaClient ( hostSet ); + tmi = new MRMetaClient(hostSet); } catch (MalformedURLException e) { throw new RuntimeException(e); } - tmi.setApiCredentials ( apiKey, apiSecret ); + tmi.setApiCredentials(apiKey, apiSecret); return tmi; } /** * Inject a consumer. Used to support unit tests. + * * @param cc */ - public static void $testInject ( MRConsumer cc ) - { + public static void $testInject(MRConsumer cc) { MRClientBuilders.sfConsumerMock = cc; } - public static MRConsumer createConsumer(String host, String topic, String username, - String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) { + public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, + String id, int i, int j, String protocalFlag, String consumerFilePath) { MRConsumerImpl sub; try { - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); } catch (MalformedURLException e) { throw new RuntimeException(e); } @@ -488,15 +592,15 @@ public class MRClientFactory sub.setProtocolFlag(protocalFlag); sub.setConsumerFilePath(consumerFilePath); return sub; - + } - - public static MRConsumer createConsumer(String host, String topic, String username, - String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) { + + public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, + String id, String protocalFlag, String consumerFilePath, int i, int j) { MRConsumerImpl sub; try { - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null); } catch (MalformedURLException e) { throw new RuntimeException(e); } @@ -506,52 +610,61 @@ public class MRClientFactory sub.setProtocolFlag(protocalFlag); sub.setConsumerFilePath(consumerFilePath); return sub; - + } - public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException { - FileReader reader = new FileReader(new File (consumerFilePath)); - Properties props = new Properties(); + public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException { + FileReader reader = new FileReader(new File(consumerFilePath)); + Properties props = new Properties(); props.load(reader); + + return createConsumer(props); + } + + public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException { int timeout; - if(props.getProperty("timeout")!=null) - timeout=Integer.parseInt(props.getProperty("timeout")); + if (props.getProperty("timeout") != null) + timeout = Integer.parseInt(props.getProperty("timeout")); else - timeout=-1; + timeout = -1; int limit; - if(props.getProperty("limit")!=null) - limit=Integer.parseInt(props.getProperty("limit")); + if (props.getProperty("limit") != null) + limit = Integer.parseInt(props.getProperty("limit")); else - limit=-1; + limit = -1; String group; - if(props.getProperty("group")==null) - group=UUID.randomUUID ().toString(); + if (props.getProperty("group") == null) + group = UUID.randomUUID().toString(); else - group=props.getProperty("group"); - MRConsumerImpl sub=null; - if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") ); + group = props.getProperty("group"); + MRConsumerImpl sub = null; + if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), + group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), + props.getProperty("authKey"), props.getProperty("authDate")); sub.setAuthKey(props.getProperty("authKey")); sub.setAuthDate(props.getProperty("authDate")); sub.setUsername(props.getProperty("username")); sub.setPassword(props.getProperty("password")); - }else{ - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") ); + } else { + sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), + group, props.getProperty("id"), timeout, limit, props.getProperty("filter"), + props.getProperty("username"), props.getProperty("password")); sub.setUsername(props.getProperty("username")); sub.setPassword(props.getProperty("password")); } sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath")); - sub.setProps(props); + sub.setProps(props); sub.setHost(props.getProperty("host")); sub.setProtocolFlag(props.getProperty("TransportType")); - //sub.setConsumerFilePath(consumerFilePath); + // sub.setConsumerFilePath(consumerFilePath); sub.setfFilter(props.getProperty("filter")); - routeFilePath=props.getProperty("DME2preferredRouterFilePath"); - routeReader= new FileReader(new File (routeFilePath)); - prop= new Properties(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File(routeFilePath)); + routeFilePath = props.getProperty("DME2preferredRouterFilePath"); + routeReader = new FileReader(new File(routeFilePath)); + prop = new Properties(); + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); } return sub; } diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java index 012e95e..999d7ef 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java @@ -50,286 +50,318 @@ import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; //import com.fasterxml.jackson.core.JsonProcessingException; -public class MRBaseClient extends HttpClient implements MRClient -{ - +public class MRBaseClient extends HttpClient implements MRClient { + private static final String MR_AUTH_CONSTANT = "X-CambriaAuth"; private static final String MR_DATE_CONSTANT = "X-CambriaDate"; - - protected MRBaseClient ( Collection<String> hosts ) throws MalformedURLException - { - super ( ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort ); - - fLog = LoggerFactory.getLogger ( this.getClass().getName () ); - } - protected MRBaseClient ( Collection<String> hosts, int stdSvcPort ) throws MalformedURLException { - super ( ConnectionType.HTTP,hosts, stdSvcPort); + protected MRBaseClient(Collection<String> hosts) throws MalformedURLException { + super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort); - fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + fLog = LoggerFactory.getLogger(this.getClass().getName()); } - protected MRBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException - { - super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000); + protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException { + super(ConnectionType.HTTP, hosts, stdSvcPort); - fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + fLog = LoggerFactory.getLogger(this.getClass().getName()); } + protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException { + super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L, + TimeUnit.MILLISECONDS, 32, 32, 600000); + + fLog = LoggerFactory.getLogger(this.getClass().getName()); + } @Override - public void close () - { + public void close() { } - protected Set<String> jsonArrayToSet ( JSONArray a ) - { - if ( a == null ) return null; + protected Set<String> jsonArrayToSet(JSONArray a) { + if (a == null) + return null; - final TreeSet<String> set = new TreeSet<String> (); - for ( int i=0; i<a.length (); i++ ) - { - set.add ( a.getString ( i )); + final TreeSet<String> set = new TreeSet<String>(); + for (int i = 0; i < a.length(); i++) { + set.add(a.getString(i)); } return set; } - public void logTo ( Logger log ) - { + public void logTo(Logger log) { fLog = log; - replaceLogger ( log ); + replaceLogger(log); } - protected Logger getLog () - { + protected Logger getLog() { return fLog; } private Logger fLog; - - public JSONObject post(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ + + public JSONObject post(final String path, final byte[] data, final String contentType, final String username, + final String password, final String protocolFlag) throws HttpException, JSONException { if ((null != username && null != password)) { WebTarget target = null; Response response = null; - + target = getTarget(path, username, password); - String encoding = Base64.encodeAsString(username+":"+password); - - - response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType)); - + String encoding = Base64.encodeAsString(username + ":" + password); + + response = target.request().header("Authorization", "Basic " + encoding) + .post(Entity.entity(data, contentType)); + return getResponseDataInJson(response); } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + throw new HttpException( + "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); } } - public String postWithResponse(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ + + public String postWithResponse(final String path, final byte[] data, final String contentType, + final String username, final String password, final String protocolFlag) + throws HttpException, JSONException { String responseData = null; if ((null != username && null != password)) { WebTarget target = null; Response response = null; - + target = getTarget(path, username, password); - String encoding = Base64.encodeAsString(username+":"+password); - - - response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType)); - + String encoding = Base64.encodeAsString(username + ":" + password); + + response = target.request().header("Authorization", "Basic " + encoding) + .post(Entity.entity(data, contentType)); + responseData = response.readEntity(String.class); return responseData; } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + throw new HttpException( + "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); } } - public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ + + public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey, + final String authDate, final String username, final String password, final String protocolFlag) + throws HttpException, JSONException { if ((null != username && null != password)) { WebTarget target = null; Response response = null; - target= getTarget(path,username, password); - response = target.request() - .header(MR_AUTH_CONSTANT, authKey) - .header(MR_DATE_CONSTANT, authDate) - .post(Entity.entity(data, contentType)); - + target = getTarget(path, username, password); + response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate) + .post(Entity.entity(data, contentType)); + return getResponseDataInJson(response); } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + throw new HttpException( + "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); } } - public String postAuthwithResponse(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ + + public String postAuthwithResponse(final String path, final byte[] data, final String contentType, + final String authKey, final String authDate, final String username, final String password, + final String protocolFlag) throws HttpException, JSONException { String responseData = null; if ((null != username && null != password)) { WebTarget target = null; Response response = null; - target= getTarget(path,username, password); - response = target.request() - .header(MR_AUTH_CONSTANT, authKey) - .header(MR_DATE_CONSTANT, authDate) - .post(Entity.entity(data, contentType)); - responseData = response.readEntity(String.class); - return responseData; - + target = getTarget(path, username, password); + response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate) + .post(Entity.entity(data, contentType)); + responseData = response.readEntity(String.class); + return responseData; + } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + throw new HttpException( + "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); } } - - public JSONObject get(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException { + public JSONObject get(final String path, final String username, final String password, final String protocolFlag) + throws HttpException, JSONException { if (null != username && null != password) { - + WebTarget target = null; Response response = null; if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - target=getTarget(path); - response = target.request() - .header(MR_AUTH_CONSTANT, username) - .header(MR_DATE_CONSTANT, password) - .get(); + target = getTarget(path); + response = target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get(); } else { target = getTarget(path, username, password); - String encoding = Base64.encodeAsString(username+":"+password); - - response = target.request().header("Authorization", "Basic " + encoding).get(); - + String encoding = Base64.encodeAsString(username + ":" + password); + + response = target.request().header("Authorization", "Basic " + encoding).get(); + } return getResponseDataInJson(response); } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + throw new HttpException( + "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); } } - - - public String getResponse(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException { + + public String getResponse(final String path, final String username, final String password, + final String protocolFlag) throws HttpException, JSONException { String responseData = null; if (null != username && null != password) { - + WebTarget target = null; Response response = null; if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - target=getTarget(path); - response = target.request() - .header(MR_AUTH_CONSTANT, username) - .header(MR_DATE_CONSTANT, password) - .get(); + target = getTarget(path); + response = target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get(); } else { target = getTarget(path, username, password); - String encoding = Base64.encodeAsString(username+":"+password); - response = target.request().header("Authorization", "Basic " + encoding).get(); + String encoding = Base64.encodeAsString(username + ":" + password); + response = target.request().header("Authorization", "Basic " + encoding).get(); } - MRClientFactory.HTTPHeadersMap=response.getHeaders(); - - String transactionid=response.getHeaderString("transactionid"); - if (transactionid!=null && !transactionid.equalsIgnoreCase("")) { - fLog.info("TransactionId : " + transactionid); + MRClientFactory.HTTPHeadersMap = response.getHeaders(); + + String transactionid = response.getHeaderString("transactionid"); + if (transactionid != null && !transactionid.equalsIgnoreCase("")) { + fLog.info("TransactionId : " + transactionid); } - + responseData = response.readEntity(String.class); return responseData; } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + throw new HttpException( + "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); } } - - public JSONObject getAuth(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException { + + public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username, + final String password, final String protocolFlag) throws HttpException, JSONException { if (null != username && null != password) { - + WebTarget target = null; Response response = null; - target=getTarget(path, username, password); - response = target.request() - .header(MR_AUTH_CONSTANT, authKey) - .header(MR_DATE_CONSTANT, authDate) - .get(); - + target = getTarget(path, username, password); + response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate).get(); + return getResponseDataInJson(response); } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + throw new HttpException( + "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); } } - - public String getAuthResponse(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException { + + public JSONObject getNoAuth(final String path, final String username, final String password, + final String protocolFlag) throws HttpException, JSONException { + if (null != username && null != password) { + + WebTarget target = null; + + Response response = null; + target = getTarget(path, username, password); + response = target.request().get(); + + return getResponseDataInJson(response); + } else { + throw new HttpException( + "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + } + } + + public String getAuthResponse(final String path, final String authKey, final String authDate, final String username, + final String password, final String protocolFlag) throws HttpException, JSONException { String responseData = null; if (null != username && null != password) { - + WebTarget target = null; Response response = null; - target=getTarget(path, username, password); - response = target.request() - .header(MR_AUTH_CONSTANT, authKey) - .header(MR_DATE_CONSTANT, authDate) - .get(); - - MRClientFactory.HTTPHeadersMap=response.getHeaders(); - - String transactionid=response.getHeaderString("transactionid"); - if (transactionid!=null && !transactionid.equalsIgnoreCase("")) { - fLog.info("TransactionId : " + transactionid); - } - - responseData = response.readEntity(String.class); - return responseData; + target = getTarget(path, username, password); + response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate).get(); + + MRClientFactory.HTTPHeadersMap = response.getHeaders(); + + String transactionid = response.getHeaderString("transactionid"); + if (transactionid != null && !transactionid.equalsIgnoreCase("")) { + fLog.info("TransactionId : " + transactionid); + } + + responseData = response.readEntity(String.class); + return responseData; } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + throw new HttpException( + "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + } + } + + public String getNoAuthResponse(String path, final String username, final String password, + final String protocolFlag) throws HttpException, JSONException { + String responseData = null; + + WebTarget target = null; + + Response response = null; + target = getTarget(path, username, password); + response = target.request().get(); + + MRClientFactory.HTTPHeadersMap = response.getHeaders(); + + String transactionid = response.getHeaderString("transactionid"); + if (transactionid != null && !transactionid.equalsIgnoreCase("")) { + fLog.info("TransactionId : " + transactionid); } + + responseData = response.readEntity(String.class); + return responseData; + } private WebTarget getTarget(final String path, final String username, final String password) { Client client = ClientBuilder.newClient(); - - // Using UNIVERSAL as it supports both BASIC and DIGEST authentication types. - HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password); - client.register(feature); - + // Using UNIVERSAL as it supports both BASIC and DIGEST authentication + // types. + HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password); + client.register(feature); + return client.target(path); } - private WebTarget getTarget(final String path) { Client client = ClientBuilder.newClient(); return client.target(path); } + private JSONObject getResponseDataInJson(Response response) throws JSONException { try { - MRClientFactory.HTTPHeadersMap=response.getHeaders(); - // fLog.info("DMAAP response status: " + response.getStatus()); - - - //MultivaluedMap<String, Object> headersMap = response.getHeaders(); - //for(String key : headersMap.keySet()) { - String transactionid=response.getHeaderString("transactionid"); - if (transactionid!=null && !transactionid.equalsIgnoreCase("")) { + MRClientFactory.HTTPHeadersMap = response.getHeaders(); + // fLog.info("DMAAP response status: " + response.getStatus()); + + // MultivaluedMap<String, Object> headersMap = + // response.getHeaders(); + // for(String key : headersMap.keySet()) { + String transactionid = response.getHeaderString("transactionid"); + if (transactionid != null && !transactionid.equalsIgnoreCase("")) { fLog.info("TransactionId : " + transactionid); } - /*final String responseData = response.readEntity(String.class); - JSONTokener jsonTokener = new JSONTokener(responseData); - JSONObject jsonObject = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if ('[' == firstChar) { - JSONArray jsonArray = new JSONArray(jsonTokener); - jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); - } else { - jsonObject = new JSONObject(jsonTokener); - } - - return jsonObject;*/ - - - if(response.getStatus()==403) { + /* + * final String responseData = response.readEntity(String.class); + * JSONTokener jsonTokener = new JSONTokener(responseData); + * JSONObject jsonObject = null; final char firstChar = + * jsonTokener.next(); jsonTokener.back(); if ('[' == firstChar) { + * JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject = + * new JSONObject(); jsonObject.put("result", jsonArray); } else { + * jsonObject = new JSONObject(jsonTokener); } + * + * return jsonObject; + */ + + if (response.getStatus() == 403) { JSONObject jsonObject = null; jsonObject = new JSONObject(); JSONArray jsonArray = new JSONArray(); @@ -339,11 +371,11 @@ public class MRBaseClient extends HttpClient implements MRClient return jsonObject; } String responseData = response.readEntity(String.class); - + JSONTokener jsonTokener = new JSONTokener(responseData); JSONObject jsonObject = null; final char firstChar = jsonTokener.next(); - jsonTokener.back(); + jsonTokener.back(); if ('[' == firstChar) { JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject = new JSONObject(); @@ -361,35 +393,35 @@ public class MRBaseClient extends HttpClient implements MRClient } } - - public String getHTTPErrorResponseMessage(String responseString){ - + + public String getHTTPErrorResponseMessage(String responseString) { + String response = null; int beginIndex = 0; int endIndex = 0; - if(responseString.contains("<body>")){ - - beginIndex = responseString.indexOf("body>")+5; + if (responseString.contains("<body>")) { + + beginIndex = responseString.indexOf("body>") + 5; endIndex = responseString.indexOf("</body"); - response = responseString.substring(beginIndex,endIndex); + response = responseString.substring(beginIndex, endIndex); } - + return response; - + } - - public String getHTTPErrorResponseCode(String responseString){ - + + public String getHTTPErrorResponseCode(String responseString) { + String response = null; int beginIndex = 0; int endIndex = 0; - if(responseString.contains("<title>")){ - beginIndex = responseString.indexOf("title>")+6; + if (responseString.contains("<title>")) { + beginIndex = responseString.indexOf("title>") + 6; endIndex = responseString.indexOf("</title"); - response = responseString.substring(beginIndex,endIndex); + response = responseString.substring(beginIndex, endIndex); } - - return response; + + return response; } - + } diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java index eb7fd91..78f37fc 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java @@ -46,46 +46,43 @@ import org.slf4j.LoggerFactory; import com.att.aft.dme2.api.DME2Client; import com.att.aft.dme2.api.DME2Exception; +import com.att.nsa.mr.client.HostSelector; import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.MRConsumer; import com.att.nsa.mr.client.response.MRConsumerResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; -public class MRConsumerImpl extends MRBaseClient implements MRConsumer -{ - +public class MRConsumerImpl extends MRBaseClient implements MRConsumer { + private static final String SUCCESS_MESSAGE = "Success"; - - - private Logger log = LoggerFactory.getLogger ( this.getClass().getName () ); - public static List<String> stringToList ( String str ) - { - final LinkedList<String> set = new LinkedList<String> (); - if ( str != null ) - { - final String[] parts = str.trim ().split ( "," ); - for ( String part : parts ) - { + + private Logger log = LoggerFactory.getLogger(this.getClass().getName()); + + public static List<String> stringToList(String str) { + final LinkedList<String> set = new LinkedList<String>(); + if (str != null) { + final String[] parts = str.trim().split(","); + for (String part : parts) { final String trimmed = part.trim(); - if ( trimmed.length () > 0 ) - { - set.add ( trimmed ); + if (trimmed.length() > 0) { + set.add(trimmed); } } } return set; } - - public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, String apiSecret_password ) throws MalformedURLException - { - this( hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, false ); - } - - public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, boolean allowSelfSignedCerts ) throws MalformedURLException - { - super ( hostPart, topic + "::" + consumerGroup + "::" + consumerId ); + + public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, + String apiSecret_password) throws MalformedURLException { + this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, + false); + } + + public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, + boolean allowSelfSignedCerts) throws MalformedURLException { + super(hostPart, topic + "::" + consumerGroup + "::" + consumerId); fTopic = topic; fGroup = consumerGroup; @@ -94,233 +91,243 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer fLimit = limit; fFilter = filter; - //setApiCredentials ( apiKey, apiSecret ); + fHostSelector = new HostSelector(hostPart); } @Override - public Iterable<String> fetch () throws IOException,Exception - { + public Iterable<String> fetch() throws IOException, Exception { // fetch with the timeout and limit set in constructor - return fetch ( fTimeoutMs, fLimit ); + return fetch(fTimeoutMs, fLimit); } @Override - public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException,Exception - { - final LinkedList<String> msgs = new LinkedList<String> (); - -// FIXME: the timeout on the socket needs to be at least as long as the long poll -// // sanity check for long poll timeout vs. socket read timeout -// final int maxReasonableTimeoutMs = CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10; -// if ( timeoutMs > maxReasonableTimeoutMs ) -// { -// log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. socket read timeout (" + -// CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll timeout to " + maxReasonableTimeoutMs + "." ); -// timeoutMs = maxReasonableTimeoutMs; -// } - - // final String urlPath = createUrlPath ( timeoutMs, limit ); - - //getLog().info ( "UEB GET " + urlPath ); - try - { + public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception { + final LinkedList<String> msgs = new LinkedList<String>(); + + // FIXME: the timeout on the socket needs to be at least as long as the + // long poll + // // sanity check for long poll timeout vs. socket read timeout + // final int maxReasonableTimeoutMs = + // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10; + // if ( timeoutMs > maxReasonableTimeoutMs ) + // { + // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. + // socket read timeout (" + + // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll + // timeout to " + maxReasonableTimeoutMs + "." ); + // timeoutMs = maxReasonableTimeoutMs; + // } + + // final String urlPath = createUrlPath ( timeoutMs, limit ); + + // getLog().info ( "UEB GET " + urlPath ); + try { if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { DMEConfigure(timeoutMs, limit); - try - { - //getLog().info ( "Receiving msgs from: " + url+subContextPath ); - String reply = sender.sendAndWait(timeoutMs+10000L); - // System.out.println("Message received = "+reply); - final JSONObject o =getResponseDataInJson(reply); - //msgs.add(reply); - if ( o != null ) - { - final JSONArray a = o.getJSONArray ( "result" ); - // final int b = o.getInt("status" ); - //if ( a != null && a.length()>0 ) - if ( a != null) - { - for ( int i=0; i<a.length (); i++ ) - { - //msgs.add("DMAAP response status: "+Integer.toString(b)); - if (a.get(i) instanceof String) - msgs.add ( a.getString(i) ); - else - msgs.add ( a.getJSONObject(i).toString() ); - - + try { + // getLog().info ( "Receiving msgs from: " + + // url+subContextPath ); + String reply = sender.sendAndWait(timeoutMs + 10000L); + final JSONObject o = getResponseDataInJson(reply); + // msgs.add(reply); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + // final int b = o.getInt("status" ); + // if ( a != null && a.length()>0 ) + if (a != null) { + for (int i = 0; i < a.length(); i++) { + // msgs.add("DMAAP response status: + // "+Integer.toString(b)); + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + + } } + // else if(a != null && a.length()<1){ + // msgs.add ("[]"); + // } } -// else if(a != null && a.length()<1){ -// msgs.add ("[]"); -// } - } - } - catch ( JSONException e ) - { + } catch (JSONException e) { // unexpected response - reportProblemWithResponse (); - log.error("exception: ", e); + reportProblemWithResponse(); + log.error("exception: ", e); + } catch (HttpException e) { + throw new IOException(e); } - catch ( HttpException e ) - { - throw new IOException ( e ); - } } - + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId,props.getProperty("Protocol")), timeoutMs, limit ); - - - try - { - final JSONObject o = get ( urlPath, username, password, protocolFlag ); - - if ( o != null ) - { - final JSONArray a = o.getJSONArray ( "result" ); - final int b = o.getInt("status" ); - //if ( a != null && a.length()>0 ) - if ( a != null) - { - for ( int i=0; i<a.length (); i++ ) - { - msgs.add("DMAAP response status: "+Integer.toString(b)); + // final String urlPath = createUrlPath + // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, + // fId,props.getProperty("Protocol")), timeoutMs, limit ); + final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + + try { + final JSONObject o = get(urlPath, username, password, protocolFlag); + + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + final int b = o.getInt("status"); + // if ( a != null && a.length()>0 ) + if (a != null) { + for (int i = 0; i < a.length(); i++) { + // msgs.add("DMAAP response status: + // "+Integer.toString(b)); if (a.get(i) instanceof String) - msgs.add ( a.getString(i) ); + msgs.add(a.getString(i)); else - msgs.add ( a.getJSONObject(i).toString() ); - + msgs.add(a.getJSONObject(i).toString()); + } } -// else if(a != null && a.length()<1) -// { -// msgs.add ("[]"); -// } + // else if(a != null && a.length()<1) + // { + // msgs.add ("[]"); + // } } - } - catch ( JSONException e ) - { + } catch (JSONException e) { // unexpected response - reportProblemWithResponse (); - log.error("exception: ", e); - } - catch ( HttpException e ) - { - throw new IOException ( e ); + reportProblemWithResponse(); + log.error("exception: ", e); + } catch (HttpException e) { + throw new IOException(e); } - } - + } + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ,props.getProperty("Protocol")), timeoutMs, limit ); - - - try - { - final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag ); - if ( o != null ) - { - final JSONArray a = o.getJSONArray ( "result" ); - final int b = o.getInt("status" ); - //if ( a != null && a.length()>0) - if ( a != null) - { - for ( int i=0; i<a.length (); i++ ) - { - msgs.add("DMAAP response status: "+Integer.toString(b)); - if (a.get(i) instanceof String) - msgs.add ( a.getString(i) ); - else - msgs.add ( a.getJSONObject(i).toString() ); - + final String urlPath = createUrlPath( + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), + timeoutMs, limit); + + try { + final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + final int b = o.getInt("status"); + // if ( a != null && a.length()>0) + if (a != null) { + for (int i = 0; i < a.length(); i++) { + // msgs.add("DMAAP response status: + // "+Integer.toString(b)); + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + + } } + // else if(a != null && a.length()<1){ + // msgs.add ("[]"); + // } } -// else if(a != null && a.length()<1){ -// msgs.add ("[]"); -// } + } catch (JSONException e) { + // unexpected response + reportProblemWithResponse(); + log.error("exception: ", e); + } catch (HttpException e) { + throw new IOException(e); } + } - catch ( JSONException e ) - { - // unexpected response - reportProblemWithResponse (); - log.error("exception: ", e); - } - catch ( HttpException e ) - { - throw new IOException ( e ); - } - + if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { + final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + + try { + final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + final int b = o.getInt("status"); + // if ( a != null && a.length()>0) + if (a != null) { + for (int i = 0; i < a.length(); i++) { + // msgs.add("DMAAP response status: + // "+Integer.toString(b)); + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + + } + } + + } + } catch (JSONException e) { + // unexpected response + reportProblemWithResponse(); + } catch (HttpException e) { + throw new IOException(e); + } + } - - } catch ( JSONException e ) { + + } catch (JSONException e) { // unexpected response - reportProblemWithResponse (); - log.error("exception: ", e); + reportProblemWithResponse(); + log.error("exception: ", e); } catch (HttpException e) { throw new IOException(e); - } catch (Exception e ) { + } catch (Exception e) { throw e; } - return msgs; } private JSONObject getResponseDataInJson(String response) { - try { - - - //log.info("DMAAP response status: " + response.getStatus()); + try { + + // log.info("DMAAP response status: " + response.getStatus()); - // final String responseData = response.readEntity(String.class); + // final String responseData = response.readEntity(String.class); + JSONTokener jsonTokener = new JSONTokener(response); + JSONObject jsonObject = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + JSONArray jsonArray = new JSONArray(jsonTokener); + jsonObject = new JSONObject(); + jsonObject.put("result", jsonArray); + } else { + jsonObject = new JSONObject(jsonTokener); + } + + return jsonObject; + } catch (JSONException excp) { + // log.error("DMAAP - Error reading response data.", excp); + return null; + } + + } + + private JSONObject getResponseDataInJsonWithResponseReturned(String response) { JSONTokener jsonTokener = new JSONTokener(response); JSONObject jsonObject = null; final char firstChar = jsonTokener.next(); - jsonTokener.back(); + jsonTokener.back(); + if (null != response && response.length() == 0) { + return null; + } + if ('[' == firstChar) { JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject = new JSONObject(); jsonObject.put("result", jsonArray); + } else if ('{' == firstChar) { + return null; + } else if ('<' == firstChar) { + return null; } else { jsonObject = new JSONObject(jsonTokener); } return jsonObject; - } catch (JSONException excp) { - // log.error("DMAAP - Error reading response data.", excp); - return null; - } - - - -} - - private JSONObject getResponseDataInJsonWithResponseReturned(String response) { - JSONTokener jsonTokener = new JSONTokener(response); - JSONObject jsonObject = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if(null != response && response.length()==0){ - return null; - } - - if ('[' == firstChar) { - JSONArray jsonArray = new JSONArray(jsonTokener); - jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); - } else if('{' == firstChar){ - return null; - } else if('<' == firstChar){ - return null; - }else{ - jsonObject = new JSONObject(jsonTokener); - } - return jsonObject; - } - + private final String fTopic; private final String fGroup; private final String fId; @@ -330,187 +337,184 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer private String username; private String password; private String host; - private String latitude; - private String longitude; - private String version; - private String serviceName; - private String env; - private String partner; - private String routeOffer; - private String subContextPath; - private String protocol; - private String methodType; - private String url; - private String dmeuser; - private String dmepassword; - private String contenttype; - private DME2Client sender; - public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); - public String consumerFilePath; - private String authKey; - private String authDate; - private Properties props; - private HashMap<String, String> DMETimeOuts; - private String handlers; - public static final String routerFilePath = null; - public static String getRouterFilePath() { - return routerFilePath; - } - - public static void setRouterFilePath(String routerFilePath) { - MRSimplerBatchPublisher.routerFilePath = routerFilePath; - } - public String getConsumerFilePath() { - return consumerFilePath; - } + HostSelector fHostSelector = null; + private String latitude; + private String longitude; + private String version; + private String serviceName; + private String env; + private String partner; + private String routeOffer; + private String subContextPath; + private String protocol; + private String methodType; + private String url; + private String dmeuser; + private String dmepassword; + private String contenttype; + private DME2Client sender; + public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); + public String consumerFilePath; + private String authKey; + private String authDate; + private Properties props; + private HashMap<String, String> DMETimeOuts; + private String handlers; + public static final String routerFilePath = null; + + public static String getRouterFilePath() { + return routerFilePath; + } - public void setConsumerFilePath(String consumerFilePath) { - this.consumerFilePath = consumerFilePath; - } + public static void setRouterFilePath(String routerFilePath) { + MRSimplerBatchPublisher.routerFilePath = routerFilePath; + } - public String getProtocolFlag() { - return protocolFlag; - } + public String getConsumerFilePath() { + return consumerFilePath; + } + + public void setConsumerFilePath(String consumerFilePath) { + this.consumerFilePath = consumerFilePath; + } - public void setProtocolFlag(String protocolFlag) { - this.protocolFlag = protocolFlag; + public String getProtocolFlag() { + return protocolFlag; + } + + public void setProtocolFlag(String protocolFlag) { + this.protocolFlag = protocolFlag; + } + + private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException { + latitude = props.getProperty("Latitude"); + longitude = props.getProperty("Longitude"); + version = props.getProperty("Version"); + serviceName = props.getProperty("ServiceName"); + env = props.getProperty("Environment"); + partner = props.getProperty("Partner"); + routeOffer = props.getProperty("routeOffer"); + + subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId; + // subContextPath=createUrlPath (subContextPath, timeoutMs, limit); + // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, + // timeoutMs); + + protocol = props.getProperty("Protocol"); + methodType = props.getProperty("MethodType"); + dmeuser = props.getProperty("username"); + dmepassword = props.getProperty("password"); + contenttype = props.getProperty("contenttype"); + handlers = props.getProperty("sessionstickinessrequired"); + // url =protocol+"://DME2SEARCH/"+ + // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner; + // url = protocol + + // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner; + + /** + * Changes to DME2Client url to use Partner for auto failover between + * data centers When Partner value is not provided use the routeOffer + * value for auto failover within a cluster + */ + + String preferredRouteKey = readRoute("preferredRouteKey"); + + if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner + + "&routeoffer=" + preferredRouteKey; + } else if (partner != null && !partner.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner; + } else if (routeOffer != null && !routeOffer.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" + + routeOffer; } - - private void DMEConfigure(int timeoutMs, int limit)throws IOException,DME2Exception, URISyntaxException{ - latitude = props.getProperty("Latitude"); - longitude = props.getProperty("Longitude"); - version = props.getProperty("Version"); - serviceName = props.getProperty("ServiceName"); - env = props.getProperty("Environment"); - partner = props.getProperty("Partner"); - routeOffer = props.getProperty("routeOffer"); - - subContextPath=props.getProperty("SubContextPath")+fTopic+"/"+fGroup+"/"+fId; - // subContextPath=createUrlPath (subContextPath, timeoutMs, limit); - //if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, timeoutMs); - - protocol = props.getProperty("Protocol"); - methodType = props.getProperty("MethodType"); - dmeuser = props.getProperty("username"); - dmepassword = props.getProperty("password"); - contenttype = props.getProperty("contenttype"); - handlers = props.getProperty("sessionstickinessrequired"); - //url =protocol+"://DME2SEARCH/"+ "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner; - // url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner; - - /** - * Changes to DME2Client url to use Partner for auto failover between data centers - * When Partner value is not provided use the routeOffer value for auto failover within a cluster - */ - - String preferredRouteKey = readRoute("preferredRouteKey"); - - if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner+"&routeoffer="+preferredRouteKey; - }else if (partner != null && !partner.isEmpty()) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; - } - else if (routeOffer!=null && !routeOffer.isEmpty()) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer; - } - - //log.info("url :"+url); - - if(timeoutMs != -1 )url=url+"&timeout="+timeoutMs; - if(limit != -1 )url=url+"&limit="+limit; - - DMETimeOuts = new HashMap<String, String>(); - DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); - DMETimeOuts.put("Content-Type", contenttype); - System.setProperty("AFT_LATITUDE", latitude); - System.setProperty("AFT_LONGITUDE", longitude); - System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT")); - // System.setProperty("DME2.DEBUG", "true"); - - //SSL changes - System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", - "SSLv3,TLSv1,TLSv1.1"); - System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); - System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); - //SSL changes - - sender = new DME2Client(new URI(url), timeoutMs+10000L); - sender.setAllowAllHttpReturnCodes(true); - sender.setMethod(methodType); - sender.setSubContext(subContextPath); - if(dmeuser != null && dmepassword != null){ + + // log.info("url :"+url); + + if (timeoutMs != -1) + url = url + "&timeout=" + timeoutMs; + if (limit != -1) + url = url + "&limit=" + limit; + + // Add filter to DME2 Url + if (fFilter != null && fFilter.length() > 0) + url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8"); + + DMETimeOuts = new HashMap<String, String>(); + DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + DMETimeOuts.put("Content-Type", contenttype); + System.setProperty("AFT_LATITUDE", latitude); + System.setProperty("AFT_LONGITUDE", longitude); + System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT")); + // System.setProperty("DME2.DEBUG", "true"); + + // SSL changes + // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", + // "SSLv3,TLSv1,TLSv1.1"); + System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2"); + System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); + System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); + // SSL changes + + sender = new DME2Client(new URI(url), timeoutMs + 10000L); + sender.setAllowAllHttpReturnCodes(true); + sender.setMethod(methodType); + sender.setSubContext(subContextPath); + if (dmeuser != null && dmepassword != null) { sender.setCredentials(dmeuser, dmepassword); - //System.out.println(dmepassword); - } - sender.setHeaders(DMETimeOuts); - sender.setPayload(""); - - if(handlers.equalsIgnoreCase("yes")){ - sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); - sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); - }else{ - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); - } - /* HeaderReplyHandler headerhandler= new HeaderReplyHandler(); - sender.setReplyHandler(headerhandler);*/ -// } catch (DME2Exception x) { -// getLog().warn(x.getMessage(), x); -// System.out.println("XXXXXXXXXXXX"+x); -// } catch (URISyntaxException x) { -// System.out.println(x); -// getLog().warn(x.getMessage(), x); -// } catch (Exception x) { -// System.out.println("XXXXXXXXXXXX"+x); -// getLog().warn(x.getMessage(), x); -// } } - public Properties getProps() { - return props; + sender.setHeaders(DMETimeOuts); + sender.setPayload(""); + + if (handlers.equalsIgnoreCase("yes")) { + sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", + props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); + sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); + } else { + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); } + } - public void setProps(Properties props) { - this.props = props; - } + public Properties getProps() { + return props; + } + + public void setProps(Properties props) { + this.props = props; + } - protected String createUrlPath (String url, int timeoutMs , int limit ) throws IOException - { - final StringBuffer contexturl= new StringBuffer(url); - // final StringBuffer url = new StringBuffer ( CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) ); - final StringBuffer adds = new StringBuffer (); - if ( timeoutMs > -1 ) adds.append ( "timeout=" ).append ( timeoutMs ); - if ( limit > -1 ) - { - if ( adds.length () > 0 ) - { - adds.append ( "&" ); + protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException { + final StringBuffer contexturl = new StringBuffer(url); + // final StringBuffer url = new StringBuffer ( + // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) ); + final StringBuffer adds = new StringBuffer(); + if (timeoutMs > -1) + adds.append("timeout=").append(timeoutMs); + if (limit > -1) { + if (adds.length() > 0) { + adds.append("&"); } - adds.append ( "limit=" ).append ( limit ); + adds.append("limit=").append(limit); } - if ( fFilter != null && fFilter.length () > 0 ) - { + if (fFilter != null && fFilter.length() > 0) { try { - if ( adds.length () > 0 ) - { - adds.append ( "&" ); + if (adds.length() > 0) { + adds.append("&"); } adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8")); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e.getMessage() + "....say whaaaat?!"); } } - if ( adds.length () > 0 ) - { - contexturl.append ( "?" ).append ( adds.toString () ); + if (adds.length() > 0) { + contexturl.append("?").append(adds.toString()); } - - //sender.setSubContext(url.toString()); - return contexturl.toString (); + + // sender.setSubContext(url.toString()); + return contexturl.toString(); } public String getUsername() { @@ -560,20 +564,20 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer public void setfFilter(String fFilter) { this.fFilter = fFilter; } - + private String readRoute(String routeKey) { try { - - MRClientFactory.prop.load(new FileReader(new File (MRClientFactory.routeFilePath))); + + MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath))); } catch (Exception ex) { - log.error("Reply Router Error " + ex.toString() ); + log.error("Reply Router Error " + ex.toString()); } - String routeOffer = MRClientFactory.prop.getProperty(routeKey); + String routeOffer = MRClientFactory.prop.getProperty(routeKey); return routeOffer; } - + @Override public MRConsumerResponse fetchWithReturnConsumerResponse() { @@ -582,13 +586,11 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer } @Override - public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, - int limit) { + public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) { final LinkedList<String> msgs = new LinkedList<String>(); MRConsumerResponse mrConsumerResponse = new MRConsumerResponse(); try { - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase( - protocolFlag)) { + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { DMEConfigure(timeoutMs, limit); String reply = sender.sendAndWait(timeoutMs + 10000L); @@ -599,7 +601,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer final JSONArray a = o.getJSONArray("result"); if (a != null) { - for (int i = 0; i < a.length(); i++) { + for (int i = 0; i < a.length(); i++) { if (a.get(i) instanceof String) msgs.add(a.getString(i)); else @@ -612,15 +614,16 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer createMRConsumerResponse(reply, mrConsumerResponse); } - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase( - protocolFlag)) { - final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, - props.getProperty("Protocol")), timeoutMs, - limit); + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + /* + * final String urlPath = createUrlPath( + * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, + * props.getProperty("Protocol")), timeoutMs, limit); + */ - String response = getResponse(urlPath, username, password, - protocolFlag); + final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + String response = getResponse(urlPath, username, password, protocolFlag); final JSONObject o = getResponseDataInJsonWithResponseReturned(response); @@ -628,7 +631,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer final JSONArray a = o.getJSONArray("result"); if (a != null) { - for (int i = 0; i < a.length(); i++) { + for (int i = 0; i < a.length(); i++) { if (a.get(i) instanceof String) msgs.add(a.getString(i)); else @@ -641,16 +644,13 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer createMRConsumerResponse(response, mrConsumerResponse); } - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase( - protocolFlag)) { + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, - props.getProperty("Protocol")), timeoutMs, - limit); + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), + timeoutMs, limit); - String response = getAuthResponse(urlPath, authKey, authDate, - username, password, protocolFlag); - final JSONObject o = getResponseDataInJsonWithResponseReturned(response); + String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag); + final JSONObject o = getResponseDataInJsonWithResponseReturned(response); if (o != null) { final JSONArray a = o.getJSONArray("result"); @@ -667,51 +667,74 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer } createMRConsumerResponse(response, mrConsumerResponse); } - - - - } catch (JSONException e) { + if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { + // final String urlPath = createUrlPath( + // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, + // props.getProperty("Protocol")), timeoutMs, + // limit); + final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + + String response = getNoAuthResponse(urlPath, username, password, protocolFlag); + final JSONObject o = getResponseDataInJsonWithResponseReturned(response); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + + } + } + + } + createMRConsumerResponse(response, mrConsumerResponse); + } + + } catch (JSONException e) { mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("json exception: ", e); - } catch (HttpException e) { + log.error("json exception: ", e); + } catch (HttpException e) { mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("http exception: ", e); - }catch(DME2Exception e){ + log.error("http exception: ", e); + } catch (DME2Exception e) { mrConsumerResponse.setResponseCode(e.getErrorCode()); mrConsumerResponse.setResponseMessage(e.getErrorMessage()); - log.error("DME2 exception: ", e); - }catch (Exception e) { + log.error("DME2 exception: ", e); + } catch (Exception e) { mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("exception: ", e); + log.error("exception: ", e); } mrConsumerResponse.setActualMessages(msgs); return mrConsumerResponse; } private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) { - - if(reply.startsWith("{")){ + + if (reply.startsWith("{")) { JSONObject jObject = new JSONObject(reply); String message = jObject.getString("message"); int status = jObject.getInt("status"); - + mrConsumerResponse.setResponseCode(Integer.toString(status)); - - if(null != message){ - mrConsumerResponse.setResponseMessage(message); - } - }else if (reply.startsWith("<")){ + + if (null != message) { + mrConsumerResponse.setResponseMessage(message); + } + } else if (reply.startsWith("<")) { mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply)); - mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); - }else{ + mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); + } else { mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); - mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE); + mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE); } - + } - } diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java index 398558d..db982ec 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java @@ -47,6 +47,7 @@ import org.apache.http.HttpException; import org.apache.http.HttpStatus; import org.json.JSONArray; import org.json.JSONObject; +import org.json.JSONTokener; import com.att.aft.dme2.api.DME2Client; import com.att.aft.dme2.api.DME2Exception; @@ -55,76 +56,66 @@ import com.att.nsa.mr.client.MRBatchingPublisher; import com.att.nsa.mr.client.response.MRPublisherResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; -public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher -{ +public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher { private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class); - public static class Builder - { - public Builder () - { + public static class Builder { + public Builder() { } - public Builder againstUrls ( Collection<String> baseUrls ) - { + public Builder againstUrls(Collection<String> baseUrls) { fUrls = baseUrls; return this; } - public Builder onTopic ( String topic ) - { + public Builder onTopic(String topic) { fTopic = topic; return this; } - public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs ) - { + public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) { fMaxBatchSize = maxBatchSize; fMaxBatchAgeMs = maxBatchAgeMs; return this; } - public Builder compress ( boolean compress ) - { + public Builder compress(boolean compress) { fCompress = compress; return this; } - - public Builder httpThreadTime ( int threadOccuranceTime ) - { + + public Builder httpThreadTime(int threadOccuranceTime) { this.threadOccuranceTime = threadOccuranceTime; return this; } - - public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts ) - { + + public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) { fAllowSelfSignedCerts = allowSelfSignedCerts; return this; } - - public Builder withResponse ( boolean withResponse) - { + + public Builder withResponse(boolean withResponse) { fWithResponse = withResponse; return this; } - public MRSimplerBatchPublisher build () - { - if(!fWithResponse) - { + + public MRSimplerBatchPublisher build() { + if (!fWithResponse) { try { - return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime); + return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, + fAllowSelfSignedCerts, threadOccuranceTime); } catch (MalformedURLException e) { throw new RuntimeException(e); } - } else - { + } else { try { - return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize); + return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, + fAllowSelfSignedCerts, fMaxBatchSize); } catch (MalformedURLException e) { throw new RuntimeException(e); } } - + } private Collection<String> fUrls; @@ -135,262 +126,250 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP private int threadOccuranceTime = 50; private boolean fAllowSelfSignedCerts = false; private boolean fWithResponse = false; - + }; @Override - public int send ( String partition, String msg ) - { - return send ( new message ( partition, msg ) ); + public int send(String partition, String msg) { + return send(new message(partition, msg)); } + @Override - public int send ( String msg ) - { - return send ( new message ( null, msg ) ); + public int send(String msg) { + return send(new message(null, msg)); } - @Override - public int send ( message msg ) - { - final LinkedList<message> list = new LinkedList<message> (); - list.add ( msg ); - return send ( list ); + public int send(message msg) { + final LinkedList<message> list = new LinkedList<message>(); + list.add(msg); + return send(list); } - - @Override - public synchronized int send ( Collection<message> msgs ) - { - if ( fClosed ) - { - throw new IllegalStateException ( "The publisher was closed." ); + public synchronized int send(Collection<message> msgs) { + if (fClosed) { + throw new IllegalStateException("The publisher was closed."); } - - for ( message userMsg : msgs ) - { - fPending.add ( new TimestampedMessage ( userMsg ) ); + + for (message userMsg : msgs) { + fPending.add(new TimestampedMessage(userMsg)); } - return getPendingMessageCount (); + return getPendingMessageCount(); } @Override - public synchronized int getPendingMessageCount () - { - return fPending.size (); + public synchronized int getPendingMessageCount() { + return fPending.size(); } @Override - public void close () - { - try - { - final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS ); - if ( remains.size() > 0 ) - { - getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. " - + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." ); + public void close() { + try { + final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + if (remains.size() > 0) { + getLog().warn("Closing publisher with " + remains.size() + " messages unsent. " + + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close."); } - } - catch ( InterruptedException e ) - { - getLog().warn ( "Possible message loss. " + e.getMessage(), e ); - } - catch ( IOException e ) - { - getLog().warn ( "Possible message loss. " + e.getMessage(), e ); + } catch (InterruptedException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); + } catch (IOException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); } } @Override - public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException - { - synchronized ( this ) - { + public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException { + synchronized (this) { fClosed = true; // stop the background sender - fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false ); - fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false ); - fExec.shutdown (); + fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + fExec.shutdown(); } - final long now = Clock.now (); - final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit ); + final long now = Clock.now(); + final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit); final long timeoutAtMs = now + waitInMs; - while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 ) - { - send ( true ); - Thread.sleep ( 250 ); + while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) { + send(true); + Thread.sleep(250); } - synchronized ( this ) - { - final LinkedList<message> result = new LinkedList<message> (); - fPending.drainTo ( result ); + synchronized (this) { + final LinkedList<message> result = new LinkedList<message>(); + fPending.drainTo(result); return result; } } /** - * Possibly send a batch to the MR server. This is called by the background thread - * and the close() method + * Possibly send a batch to the MR server. This is called by the background + * thread and the close() method * * @param force */ - private synchronized void send ( boolean force ) - { - if ( force || shouldSendNow () ) - { - if ( !sendBatch () ) - { - getLog().warn ( "Send failed, " + fPending.size() + " message to send." ); + private synchronized void send(boolean force) { + if (force || shouldSendNow()) { + if (!sendBatch()) { + getLog().warn("Send failed, " + fPending.size() + " message to send."); // note the time for back-off - fDontSendUntilMs = sfWaitAfterError + Clock.now (); + fDontSendUntilMs = sfWaitAfterError + Clock.now(); } } } - private synchronized boolean shouldSendNow () - { + private synchronized boolean shouldSendNow() { boolean shouldSend = false; - if ( fPending.size () > 0 ) - { - final long nowMs = Clock.now (); + if (fPending.size() > 0) { + final long nowMs = Clock.now(); - shouldSend = ( fPending.size() >= fMaxBatchSize ); - if ( !shouldSend ) - { + shouldSend = (fPending.size() >= fMaxBatchSize); + if (!shouldSend) { final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; shouldSend = sendAtMs <= nowMs; } // however, wait after an error - shouldSend = shouldSend && nowMs >= fDontSendUntilMs; + shouldSend = shouldSend && nowMs >= fDontSendUntilMs; } return shouldSend; } - private synchronized boolean sendBatch () - { - // it's possible for this call to be made with an empty list. in this case, just return. - if ( fPending.size() < 1 ) - { + /** + * Method to parse published JSON Objects and Arrays + * + * @return JSONArray + */ + private JSONArray parseJSON() { + JSONArray jsonArray = new JSONArray(); + for (TimestampedMessage m : fPending) { + JSONTokener jsonTokener = new JSONTokener(m.fMsg); + JSONObject jsonObject = null; + JSONArray tempjsonArray = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + tempjsonArray = new JSONArray(jsonTokener); + if (null != tempjsonArray) { + for (int i = 0; i < tempjsonArray.length(); i++) { + jsonArray.put(tempjsonArray.getJSONObject(i)); + } + } + } else { + jsonObject = new JSONObject(jsonTokener); + jsonArray.put(jsonObject); + } + + } + return jsonArray; + } + + private synchronized boolean sendBatch() { + // it's possible for this call to be made with an empty list. in this + // case, just return. + if (fPending.size() < 1) { return true; } - final long nowMs = Clock.now (); - + final long nowMs = Clock.now(); + host = this.fHostSelector.selectBaseHost(); - - final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") ); - - - try - { - /*final String contentType = - fCompress ? - MRFormat.CAMBRIA_ZIP.toString () : - MRFormat.CAMBRIA.toString () - ;*/ - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); + + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), + props.getProperty("partition")); + + try { + /* + * final String contentType = fCompress ? + * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ; + */ + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); OutputStream os = baseStream; final String contentType = props.getProperty("contenttype"); - if(contentType.equalsIgnoreCase("application/json")){ - JSONArray jsonArray = new JSONArray(); - for ( TimestampedMessage m : fPending ) - { - JSONObject jsonObject = new JSONObject(m.fMsg); - - jsonArray.put(jsonObject); - + if (contentType.equalsIgnoreCase("application/json")) { + JSONArray jsonArray = parseJSON(); + os.write(jsonArray.toString().getBytes()); + os.close(); + + } else if (contentType.equalsIgnoreCase("text/plain")) { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + } else if (contentType.equalsIgnoreCase("application/cambria") + || (contentType.equalsIgnoreCase("application/cambria-zip"))) { + if (contentType.equalsIgnoreCase("application/cambria-zip")) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : fPending) { + + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); } - os.write (jsonArray.toString().getBytes() ); os.close(); + } else { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); - }else if (contentType.equalsIgnoreCase("text/plain")){ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){ - if ( contentType.equalsIgnoreCase("application/cambria-zip") ) - { - os = new GZIPOutputStream ( baseStream ); - } - for ( TimestampedMessage m : fPending ) - { - - os.write ( ( "" + m.fPartition.length () ).getBytes() ); - os.write ( '.' ); - os.write ( ( "" + m.fMsg.length () ).getBytes() ); - os.write ( '.' ); - os.write ( m.fPartition.getBytes() ); - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - }else{ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - - } - os.close (); } - - + os.close(); + } - final long startMs = Clock.now (); + final long startMs = Clock.now(); if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - - + DME2Configue(); - + Thread.sleep(5); - getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - sender.setPayload(os.toString()); + getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + sender.setPayload(os.toString()); String dmeResponse = sender.sendAndWait(5000L); - - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" - + dmeResponse.toString(); + + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString(); getLog().info(logLine); fPending.clear(); return true; - } - + } + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag); - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - if(result.getInt("status") < 200 || result.getInt("status") > 299) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, + username, password, protocolFlag); + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + if (result.getInt("status") < 200 || result.getInt("status") > 299) { return false; } final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); getLog().info(logLine); fPending.clear(); return true; - } - + } + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); - - - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - if(result.getInt("status") < 200 || result.getInt("status") > 299) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, + protocolFlag); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + if (result.getInt("status") < 200 || result.getInt("status") > 299) { return false; } final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); @@ -398,118 +377,100 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP fPending.clear(); return true; } - } - catch ( IllegalArgumentException x ) { - getLog().warn ( x.getMessage(), x ); - } catch ( IOException x ) { - getLog().warn ( x.getMessage(), x ); + } catch (IllegalArgumentException x) { + getLog().warn(x.getMessage(), x); + } catch (IOException x) { + getLog().warn(x.getMessage(), x); } catch (HttpException x) { - getLog().warn ( x.getMessage(), x ); + getLog().warn(x.getMessage(), x); } catch (Exception x) { getLog().warn(x.getMessage(), x); } return false; } - public synchronized MRPublisherResponse sendBatchWithResponse () - { - // it's possible for this call to be made with an empty list. in this case, just return. - if ( fPending.size() < 1 ) - { + public synchronized MRPublisherResponse sendBatchWithResponse() { + // it's possible for this call to be made with an empty list. in this + // case, just return. + if (fPending.size() < 1) { pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); pubResponse.setResponseMessage("No Messages to send"); return pubResponse; } - final long nowMs = Clock.now (); - + final long nowMs = Clock.now(); + host = this.fHostSelector.selectBaseHost(); - - final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") ); - OutputStream os=null; - try - { - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); - os = baseStream; + + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), + props.getProperty("partition")); + OutputStream os = null; + try { + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); + os = baseStream; final String contentType = props.getProperty("contenttype"); - if(contentType.equalsIgnoreCase("application/json")){ - JSONArray jsonArray = new JSONArray(); - for ( TimestampedMessage m : fPending ) - { - JSONObject jsonObject = new JSONObject(m.fMsg); - - jsonArray.put(jsonObject); - + if (contentType.equalsIgnoreCase("application/json")) { + JSONArray jsonArray = parseJSON(); + os.write(jsonArray.toString().getBytes()); + } else if (contentType.equalsIgnoreCase("text/plain")) { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + os.write('\n'); } - os.write (jsonArray.toString().getBytes() ); - }else if (contentType.equalsIgnoreCase("text/plain")){ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){ - if ( contentType.equalsIgnoreCase("application/cambria-zip") ) - { - os = new GZIPOutputStream ( baseStream ); - } - for ( TimestampedMessage m : fPending ) - { - - os.write ( ( "" + m.fPartition.length () ).getBytes() ); - os.write ( '.' ); - os.write ( ( "" + m.fMsg.length () ).getBytes() ); - os.write ( '.' ); - os.write ( m.fPartition.getBytes() ); - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - }else{ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - - } + } else if (contentType.equalsIgnoreCase("application/cambria") + || (contentType.equalsIgnoreCase("application/cambria-zip"))) { + if (contentType.equalsIgnoreCase("application/cambria-zip")) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : fPending) { + + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); } - - + os.close(); + } else { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); - final long startMs = Clock.now (); + } + } + + final long startMs = Clock.now(); if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - - + try { - DME2Configue(); - - Thread.sleep(5); - getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - sender.setPayload(os.toString()); - - - String dmeResponse = sender.sendAndWait(5000L); - System.out.println("dmeres->"+dmeResponse); - - - pubResponse = createMRPublisherResponse(dmeResponse,pubResponse); - - if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - - return pubResponse; - } - final String logLine = String.valueOf((Clock.now() - startMs)) - + dmeResponse.toString(); - getLog().info(logLine); - fPending.clear(); - - } - catch (DME2Exception x) { + DME2Configue(); + + Thread.sleep(5); + getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + sender.setPayload(os.toString()); + + String dmeResponse = sender.sendAndWait(5000L); + + pubResponse = createMRPublisherResponse(dmeResponse, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString(); + getLog().info(logLine); + fPending.clear(); + + } catch (DME2Exception x) { getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(x.getErrorCode()); pubResponse.setResponseMessage(x.getErrorMessage()); } catch (URISyntaxException x) { - + getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); pubResponse.setResponseMessage(x.getMessage()); @@ -517,135 +478,127 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); pubResponse.setResponseMessage(x.getMessage()); - logger.error("exception: ", x); - + logger.error("exception: ", x); + } - + return pubResponse; - } - + } + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag); - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - - - pubResponse = createMRPublisherResponse(result,pubResponse); - - if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, + authDate, username, password, protocolFlag); + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + return pubResponse; } - + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); getLog().info(logLine); fPending.clear(); return pubResponse; - } - + } + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); - - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - pubResponse = createMRPublisherResponse(result,pubResponse); - - if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, + password, protocolFlag); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + return pubResponse; } - + final String logLine = String.valueOf((Clock.now() - startMs)); getLog().info(logLine); fPending.clear(); return pubResponse; } - } - catch ( IllegalArgumentException x ) { - getLog().warn ( x.getMessage(), x ); + } catch (IllegalArgumentException x) { + getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); pubResponse.setResponseMessage(x.getMessage()); - - } catch ( IOException x ) { - getLog().warn ( x.getMessage(), x ); + + } catch (IOException x) { + getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); pubResponse.setResponseMessage(x.getMessage()); - + } catch (HttpException x) { - getLog().warn ( x.getMessage(), x ); + getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); pubResponse.setResponseMessage(x.getMessage()); - + } catch (Exception x) { getLog().warn(x.getMessage(), x); - + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); pubResponse.setResponseMessage(x.getMessage()); - + } - + finally { - if (fPending.size()>0) { - getLog().warn ( "Send failed, " + fPending.size() + " message to send." ); + if (fPending.size() > 0) { + getLog().warn("Send failed, " + fPending.size() + " message to send."); pubResponse.setPendingMsgs(fPending.size()); } if (os != null) { try { - os.close(); + os.close(); } catch (Exception x) { getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); pubResponse.setResponseMessage("Error in closing Output Stream"); } - } + } } - + return pubResponse; } - -private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) { - - if (reply.isEmpty()) - { - - mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - mrPubResponse.setResponseMessage("Please verify the Producer properties"); - } - else if(reply.startsWith("{")) - { + + private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) { + + if (reply.isEmpty()) { + + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + mrPubResponse.setResponseMessage("Please verify the Producer properties"); + } else if (reply.startsWith("{")) { JSONObject jObject = new JSONObject(reply); - if(jObject.has("message") && jObject.has("status")) - { + if (jObject.has("message") && jObject.has("status")) { String message = jObject.getString("message"); - if(null != message) - { - mrPubResponse.setResponseMessage(message); + if (null != message) { + mrPubResponse.setResponseMessage(message); } mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status"))); + } else { + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); + mrPubResponse.setResponseMessage(reply); } - else - { - mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); - mrPubResponse.setResponseMessage(reply); - } - } - else if (reply.startsWith("<")) - { - String responseCode = getHTTPErrorResponseCode(reply); - if( responseCode.contains("403")) - { - responseCode = "403"; - } - mrPubResponse.setResponseCode(responseCode); - mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); - } - + } else if (reply.startsWith("<")) { + String responseCode = getHTTPErrorResponseCode(reply); + if (responseCode.contains("403")) { + responseCode = "403"; + } + mrPubResponse.setResponseCode(responseCode); + mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); + } + return mrPubResponse; } @@ -658,10 +611,10 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR private String username; private String password; private String host; - - //host selector + + // host selector private HostSelector fHostSelector = null; - + private final LinkedBlockingQueue<TimestampedMessage> fPending; private long fDontSendUntilMs; private final ScheduledThreadPoolExecutor fExec; @@ -684,25 +637,24 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR private HashMap<String, String> DMETimeOuts; private DME2Client sender; public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); - public String producerFilePath; private String authKey; private String authDate; private String handlers; private Properties props; public static String routerFilePath; - protected static final Map<String, String> headers=new HashMap<String, String>(); + protected static final Map<String, String> headers = new HashMap<String, String>(); public static MultivaluedMap<String, Object> headersMap; - - + private MRPublisherResponse pubResponse; - + public MRPublisherResponse getPubResponse() { return pubResponse; } + public void setPubResponse(MRPublisherResponse pubResponse) { this.pubResponse = pubResponse; } - + public static String getRouterFilePath() { return routerFilePath; } @@ -719,14 +671,6 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR this.props = props; } - public String getProducerFilePath() { - return producerFilePath; - } - - public void setProducerFilePath(String producerFilePath) { - this.producerFilePath = producerFilePath; - } - public String getProtocolFlag() { return protocolFlag; } @@ -734,14 +678,14 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR public void setProtocolFlag(String protocolFlag) { this.protocolFlag = protocolFlag; } - - + private void DME2Configue() throws Exception { try { - - /* FileReader reader = new FileReader(new File (producerFilePath)); - Properties props = new Properties(); - props.load(reader);*/ + + /* + * FileReader reader = new FileReader(new File (producerFilePath)); + * Properties props = new Properties(); props.load(reader); + */ latitude = props.getProperty("Latitude"); longitude = props.getProperty("Longitude"); version = props.getProperty("Version"); @@ -749,41 +693,43 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR env = props.getProperty("Environment"); partner = props.getProperty("Partner"); routeOffer = props.getProperty("routeOffer"); - subContextPath = props.getProperty("SubContextPath")+fTopic; - /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){ - subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition"); - }*/ + subContextPath = props.getProperty("SubContextPath") + fTopic; + /* + * if(props.getProperty("partition")!=null && + * !props.getProperty("partition").equalsIgnoreCase("")){ + * subContextPath=subContextPath+"?partitionKey="+props.getProperty( + * "partition"); } + */ protocol = props.getProperty("Protocol"); methodType = props.getProperty("MethodType"); dmeuser = props.getProperty("username"); dmepassword = props.getProperty("password"); contentType = props.getProperty("contenttype"); handlers = props.getProperty("sessionstickinessrequired"); - routerFilePath= props.getProperty("DME2preferredRouterFilePath"); - + routerFilePath = props.getProperty("DME2preferredRouterFilePath"); + /** - * Changes to DME2Client url to use Partner for auto failover between data centers - * When Partner value is not provided use the routeOffer value for auto failover within a cluster + * Changes to DME2Client url to use Partner for auto failover + * between data centers When Partner value is not provided use the + * routeOffer value for auto failover within a cluster */ - String partitionKey = props.getProperty("partition"); - - if (partner != null && !partner.isEmpty() ) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; - if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){ - url = url + "&partitionKey=" + partitionKey; - } - } - else if (routeOffer!=null && !routeOffer.isEmpty()) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer; - if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){ - url = url + "&partitionKey=" + partitionKey; - } + + if (partner != null && !partner.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + + partner; + if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { + url = url + "&partitionKey=" + partitionKey; + } + } else if (routeOffer != null && !routeOffer.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" + + routeOffer; + if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { + url = url + "&partitionKey=" + partitionKey; + } } - + DMETimeOuts = new HashMap<String, String>(); DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); @@ -791,56 +737,56 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR DMETimeOuts.put("Content-Type", contentType); System.setProperty("AFT_LATITUDE", latitude); System.setProperty("AFT_LONGITUDE", longitude); - System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT")); - //System.setProperty("DME2.DEBUG", "true"); - // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true"); - //System.out.println("XXXXXX"+url); - - //SSL changes - System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", - "SSLv3,TLSv1,TLSv1.1"); + System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT")); + // System.setProperty("DME2.DEBUG", "true"); + + // SSL changes + // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", + // "SSLv3,TLSv1,TLSv1.1"); + System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2"); System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); - - //SSL changes - + + // SSL changes + sender = new DME2Client(new URI(url), 5000L); - + sender.setAllowAllHttpReturnCodes(true); sender.setMethod(methodType); - sender.setSubContext(subContextPath); + sender.setSubContext(subContextPath); sender.setCredentials(dmeuser, dmepassword); sender.setHeaders(DMETimeOuts); - if(handlers.equalsIgnoreCase("yes")){ - sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); + if (handlers.equalsIgnoreCase("yes")) { + sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", + props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", + props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); - }else{ - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); - } + } else { + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); + } } catch (DME2Exception x) { getLog().warn(x.getMessage(), x); - throw new DME2Exception(x.getErrorCode(),x.getErrorMessage()); + throw new DME2Exception(x.getErrorCode(), x.getErrorMessage()); } catch (URISyntaxException x) { - + getLog().warn(x.getMessage(), x); - throw new URISyntaxException(url,x.getMessage()); + throw new URISyntaxException(url, x.getMessage()); } catch (Exception x) { getLog().warn(x.getMessage(), x); throw new Exception(x.getMessage()); } } - - private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException - { - super ( hosts ); - if ( topic == null || topic.length() < 1 ) - { - throw new IllegalArgumentException ( "A topic must be provided." ); + private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, + boolean compress) throws MalformedURLException { + super(hosts); + + if (topic == null || topic.length() < 1) { + throw new IllegalArgumentException("A topic must be provided."); } - + fHostSelector = new HostSelector(hosts, null); fClosed = false; fTopic = topic; @@ -848,49 +794,45 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR fMaxBatchAgeMs = maxBatchAgeMs; fCompress = compress; - fPending = new LinkedBlockingQueue<TimestampedMessage> (); + fPending = new LinkedBlockingQueue<TimestampedMessage>(); fDontSendUntilMs = 0; - fExec = new ScheduledThreadPoolExecutor ( 1 ); + fExec = new ScheduledThreadPoolExecutor(1); pubResponse = new MRPublisherResponse(); - + } - - private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException - { - super ( hosts ); - if ( topic == null || topic.length() < 1 ) - { - throw new IllegalArgumentException ( "A topic must be provided." ); + private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, + boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException { + super(hosts); + + if (topic == null || topic.length() < 1) { + throw new IllegalArgumentException("A topic must be provided."); } - + fHostSelector = new HostSelector(hosts, null); fClosed = false; fTopic = topic; fMaxBatchSize = maxBatchSize; fMaxBatchAgeMs = maxBatchAgeMs; fCompress = compress; - threadOccuranceTime=httpThreadOccurnace; - fPending = new LinkedBlockingQueue<TimestampedMessage> (); + threadOccuranceTime = httpThreadOccurnace; + fPending = new LinkedBlockingQueue<TimestampedMessage>(); fDontSendUntilMs = 0; - fExec = new ScheduledThreadPoolExecutor ( 1 ); - fExec.scheduleAtFixedRate ( new Runnable() - { + fExec = new ScheduledThreadPoolExecutor(1); + fExec.scheduleAtFixedRate(new Runnable() { @Override - public void run () - { - send ( false ); + public void run() { + send(false); } - }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS ); + }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS); } - private static class TimestampedMessage extends message - { - public TimestampedMessage ( message m ) - { - super ( m ); + private static class TimestampedMessage extends message { + public TimestampedMessage(message m) { + super(m); timestamp = Clock.now(); } + public final long timestamp; } @@ -941,5 +883,5 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR public void setAuthDate(String authDate) { this.authDate = authDate; } - + } diff --git a/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java index bdd15d4..2886db5 100644 --- a/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java +++ b/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java @@ -38,14 +38,14 @@ public class SimpleExampleConsumer { private static final Logger logger = LoggerFactory.getLogger(SimpleExampleConsumer.class); - private SimpleExampleConsumer() { - } + private SimpleExampleConsumer() { + } public static void main(String[] args) { long count = 0; long nextReport = 5000; - String key; + String key; final long startMs = System.currentTimeMillis(); @@ -54,24 +54,24 @@ public class SimpleExampleConsumer { final MRConsumer cc = MRClientFactory.createConsumer("D:\\SG\\consumer.properties"); while (true) { for (String msg : cc.fetch()) { - logger.debug("Message Received: " + msg); + logger.debug("Message Received: " + msg); } // Header for DME2 Call. MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap; - for (MultivaluedMap.Entry<String,List<Object>> entry: headersMap.entrySet()) { - key = entry.getKey(); - logger.debug("Header Key " + key); - logger.debug("Header Value " + headersMap.get(key)); + for (MultivaluedMap.Entry<String, List<Object>> entry : headersMap.entrySet()) { + key = entry.getKey(); + logger.debug("Header Key " + key); + logger.debug("Header Value " + headersMap.get(key)); } // Header for HTTP Call. - - Map<String, String> dme2headersMap=MRClientFactory.DME2HeadersMap; - for(Map.Entry<String,String> entry: dme2headersMap.entrySet()) { - key = entry.getKey(); - logger.debug("Header Key " + key); - logger.debug("Header Value " + dme2headersMap.get(key)); - } - + + Map<String, String> dme2headersMap = MRClientFactory.DME2HeadersMap; + for (Map.Entry<String, String> entry : dme2headersMap.entrySet()) { + key = entry.getKey(); + logger.debug("Header Key " + key); + logger.debug("Header Value " + dme2headersMap.get(key)); + } + if (count > nextReport) { nextReport += 5000; @@ -79,11 +79,10 @@ public class SimpleExampleConsumer { final long elapsedMs = endMs - startMs; final double elapsedSec = elapsedMs / 1000.0; final double eps = count / elapsedSec; - logger.error("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps"); } } } catch (Exception x) { - logger.error(x.getClass().getName() + ": " + x.getMessage()); + logger.error(x.getClass().getName() + ": " + x.getMessage()); } } } diff --git a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java index 6e86d47..a4a176e 100644 --- a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java +++ b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java @@ -29,11 +29,9 @@ package com.att.nsa.mr.test.clients; * */ public enum ProtocolTypeConstants { - - DME2("DME2"), - AAF_AUTH("HTTPAAF"), - AUTH_KEY("HTTPAUTH"); - + + DME2("DME2"), AAF_AUTH("HTTPAAF"), AUTH_KEY("HTTPAUTH"), HTTPNOAUTH("HTTPNOAUTH"); + private String value; private ProtocolTypeConstants(String value) { diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java index 5ae36d2..0e3ee5a 100644 --- a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java @@ -33,57 +33,52 @@ import org.slf4j.LoggerFactory; import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.MRConsumer; -public class SimpleExampleConsumer -{ +public class SimpleExampleConsumer { - static FileWriter routeWriter= null; - static Properties props=null; - static FileReader routeReader=null; - public static void main ( String[] args ) - { + static FileWriter routeWriter = null; + static Properties props = null; + static FileReader routeReader = null; + + public static void main(String[] args) { final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumer.class); - + long count = 0; long nextReport = 5000; - final long startMs = System.currentTimeMillis (); - - try - { - String routeFilePath="/src/main/resources/dme2/preferredRoute.txt"; - - - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File (routeFilePath)); - } - routeReader= new FileReader(new File (routeFilePath)); - props= new Properties(); - final MRConsumer cc = MRClientFactory.createConsumer ( "/src/main/resources/dme2/consumer.properties" ); - while ( true ) - { - for ( String msg : cc.fetch () ) - { - //System.out.println ( "" + (++count) + ": " + msg ); + final long startMs = System.currentTimeMillis(); + + try { + String routeFilePath = "/src/main/resources/dme2/preferredRoute.txt"; + + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); + } + routeReader = new FileReader(new File(routeFilePath)); + props = new Properties(); + final MRConsumer cc = MRClientFactory.createConsumer("/src/main/resources/dme2/consumer.properties"); + int i = 0; + while (i < 10) { + Thread.sleep(2); + i++; + for (String msg : cc.fetch()) { + // System.out.println ( "" + (++count) + ": " + msg ); System.out.println(msg); } - - if ( count > nextReport ) - { + + if (count > nextReport) { nextReport += 5000; - - final long endMs = System.currentTimeMillis (); + + final long endMs = System.currentTimeMillis(); final long elapsedMs = endMs - startMs; final double elapsedSec = elapsedMs / 1000.0; final double eps = count / elapsedSec; - System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps"); } } - } - catch ( Exception x ) - { - System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); - LOG.error("exception: ", x); + } catch (Exception x) { + System.err.println(x.getClass().getName() + ": " + x.getMessage()); + LOG.error("exception: ", x); } } } |