summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorsunil.unnava <su622b@att.com>2018-01-23 15:26:15 -0500
committersunil.unnava <su622b@att.com>2018-01-23 15:36:05 -0500
commit85c21e1d85c545717affd3f18cd8e9fe6dc14562 (patch)
tree06909dffa1ac3cb95f08aa1dcebfe32708578e0a /src/main
parent0497d0508a62ff513ff6883c2f6e1947da968d37 (diff)
Changes to the DMaap Client
Added new API to the DMaapClient Issue-ID: DMAAP-214 Change-Id: I4de2da7ca42ad1b5925a2df9d26672875dd15b10 Signed-off-by: sunil.unnava <su622b@att.com>
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRClientFactory.java721
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java386
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java855
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java898
-rw-r--r--src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java35
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java8
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java71
7 files changed, 1538 insertions, 1436 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
index 59e472c..b654282 100644
--- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
+++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
@@ -43,158 +43,205 @@ import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
/**
* A factory for MR clients.<br/>
* <br/>
- * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates
- * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive
- * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's
- * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across
- * them. Be sure to use a different ID for each instance.)<br/>
+ * Use caution selecting a consumer creator factory. If the call doesn't accept
+ * a consumer group name, then it creates a consumer that is not restartable.
+ * That is, if you stop your process and start it again, your client will NOT
+ * receive any missed messages on the topic. If you need to ensure receipt of
+ * missed messages, then you must use a consumer that's created with a group
+ * name and ID. (If you create multiple consumer processes using the same group,
+ * load is split across them. Be sure to use a different ID for each
+ * instance.)<br/>
* <br/>
- * Publishers
+ * Publishers
*
* @author author
*/
-public class MRClientFactory
-{
+public class MRClientFactory {
public static MultivaluedMap<String, Object> HTTPHeadersMap;
public static Map<String, String> DME2HeadersMap;
public static String routeFilePath;
-
+
public static FileReader routeReader;
-
- public static FileWriter routeWriter= null;
- public static Properties prop=null;
- //routeReader= new FileReader(new File (routeFilePath));
- //props= new Properties();
+
+ public static FileWriter routeWriter = null;
+ public static Properties prop = null;
+
+ // routeReader= new FileReader(new File (routeFilePath));
+ // props= new Properties();
/**
- * Create a consumer instance with the default timeout and no limit
- * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
- * across sessions.
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
*
- * @param hostList A comma separated list of hosts to use to connect to MR.
- * You can include port numbers (3904 is the default). For example, "hostname:8080,"
+ * @param hostList
+ * A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default). For example,
+ * "hostname:8080,"
*
- * @param topic The topic to consume
+ * @param topic
+ * The topic to consume
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( String hostList, String topic )
- {
- return createConsumer ( MRConsumerImpl.stringToList(hostList), topic );
+ public static MRConsumer createConsumer(String hostList, String topic) {
+ return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
}
/**
- * Create a consumer instance with the default timeout and no limit
- * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
- * across sessions.
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
*
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, String topic )
- {
- return createConsumer ( hostSet, topic, null );
+ public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
+ return createConsumer(hostSet, topic, null);
}
/**
- * Create a consumer instance with server-side filtering, the default timeout, and no limit
- * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
+ * Create a consumer instance with server-side filtering, the default
+ * timeout, and no limit on messages returned. This consumer operates as an
+ * independent consumer (i.e., not in a group) and is NOT re-startable
* across sessions.
*
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
- * @param filter a filter to use on the server side
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param filter
+ * a filter to use on the server side
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter )
- {
- return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null );
+ public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
+ return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
}
/**
- * Create a consumer instance with the default timeout, and no limit
- * on messages returned. This consumer can operate in a logical group and is re-startable
- * across sessions when you use the same group and ID on restart.
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
*
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consume in its group
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId )
- {
- return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 );
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
}
/**
- * Create a consumer instance with the default timeout, and no limit
- * on messages returned. This consumer can operate in a logical group and is re-startable
- * across sessions when you use the same group and ID on restart.
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
*
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consume in its group
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
- * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit)
- {
- return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null );
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
}
/**
- * Create a consumer instance with the default timeout, and no limit
- * on messages returned. This consumer can operate in a logical group and is re-startable
- * across sessions when you use the same group and ID on restart. This consumer also uses
- * server-side filtering.
- *
- * @param hostList A comma separated list of hosts to use to connect to MR.
- * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
- * @param topic The topic to consume
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consume in its group
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
- * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
- * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostList
+ * A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default). For example,
+ * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
- {
- return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup,
- consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
+ public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
+ filter, apiKey, apiSecret);
}
/**
- * Create a consumer instance with the default timeout, and no limit
- * on messages returned. This consumer can operate in a logical group and is re-startable
- * across sessions when you use the same group and ID on restart. This consumer also uses
- * server-side filtering.
- *
- * @param hostSet The host used in the URL to MR. Entries can be "host:port".
- * @param topic The topic to consume
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consume in its group
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
- * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
- * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
*
* @return a consumer
*/
- public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
- {
- if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock;
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ if (MRClientBuilders.sfConsumerMock != null)
+ return MRClientBuilders.sfConsumerMock;
try {
- return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
+ return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
+ apiSecret);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
@@ -203,282 +250,339 @@ public class MRClientFactory
/*************************************************************************/
/*************************************************************************/
/*************************************************************************/
-
+
/**
- * Create a publisher that sends each message (or group of messages) immediately. Most
- * applications should favor higher latency for much higher message throughput and the
- * "simple publisher" is not a good choice.
- *
- * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
- * @param topic The topic on which to publish messages.
+ * Create a publisher that sends each message (or group of messages)
+ * immediately. Most applications should favor higher latency for much
+ * higher message throughput and the "simple publisher" is not a good
+ * choice.
+ *
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
* @return a publisher
*/
- public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic )
- {
- return createBatchingPublisher ( hostlist, topic, 1, 1 );
+ public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
+ return createBatchingPublisher(hostlist, topic, 1, 1);
}
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown. Message payloads are not compressed.
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown. Message payloads are
+ * not compressed.
*
- * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
- * @param topic The topic on which to publish messages.
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
*
* @return a publisher
*/
- public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs )
- {
- return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false );
+ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+ long maxAgeMs) {
+ return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
}
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown.
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
*
- * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
- * @param topic The topic on which to publish messages.
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
- * @param compress use gzip compression
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
*
* @return a publisher
*/
- public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
- {
- return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress );
+ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+ long maxAgeMs, boolean compress) {
+ return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
}
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown.
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
*
- * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param topic The topic on which to publish messages.
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
- * @param compress use gzip compression
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
*
* @return a publisher
*/
- public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
- {
- final TreeSet<String> hosts = new TreeSet<String> ();
- for ( String hp : hostSet )
- {
- hosts.add ( hp );
+ public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
+ long maxAgeMs, boolean compress) {
+ final TreeSet<String> hosts = new TreeSet<String>();
+ for (String hp : hostSet) {
+ hosts.add(hp);
}
- return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress );
+ return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
}
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown.
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
*
- * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param topic The topic on which to publish messages.
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
- * @param compress use gzip compression
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
*
* @return a publisher
*/
- public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
- {
- return new MRSimplerBatchPublisher.Builder ().
- againstUrls ( hostSet ).
- onTopic ( topic ).
- batchTo ( maxBatchSize, maxAgeMs ).
- compress ( compress ).
- build ();
+ public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
+ int maxBatchSize, long maxAgeMs, boolean compress) {
+ return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+ .compress(compress).build();
}
-
+
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown.
- * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param topic The topic on which to publish messages.
- * @param username username
- * @param password password
- * @param maxBatchSize The largest set of messages to batch
- * @param maxAgeMs The maximum age of a message waiting in a batch
- * @param compress use gzip compression
- * @param protocolFlag http auth or ueb auth or dme2 method
- * @param producerFilePath all properties for publisher
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param host
+ * A host to be used in the URL to MR. Can be "host:port". Use
+ * multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param username
+ * username
+ * @param password
+ * password
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ * @param protocolFlag
+ * http auth or ueb auth or dme2 method
+ * @param producerFilePath
+ * all properties for publisher
* @return MRBatchingPublisher obj
*/
- public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath )
- {
- MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
- againstUrls(MRConsumerImpl.stringToList(host)).
- onTopic ( topic ).
- batchTo ( maxBatchSize, maxAgeMs ).
- compress ( compress ).
- build ();
-
+ public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
+ final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag,
+ String producerFilePath) {
+ MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+ .compress(compress).build();
+
pub.setHost(host);
pub.setUsername(username);
pub.setPassword(password);
pub.setProtocolFlag(protocolFlag);
- pub.setProducerFilePath(producerFilePath);
return pub;
}
-
-
+
/**
- * Create a publisher that batches messages. Be sure to close the publisher to
- * send the last batch and ensure a clean shutdown
- * @param producerFilePath set all properties for publishing message
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param Properties
+ * props set all properties for publishing message
* @return MRBatchingPublisher obj
- * @throws FileNotFoundException exc
- * @throws IOException ioex
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
*/
- public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException {
- FileReader reader = new FileReader(new File (producerFilePath));
- Properties props = new Properties();
+ public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ return createInternalBatchingPublisher(props, withResponse);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param Properties
+ * props set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(Properties props)
+ throws FileNotFoundException, IOException {
+ return createInternalBatchingPublisher(props, false);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param producerFilePath
+ * set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
+ throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(producerFilePath));
+ Properties props = new Properties();
props.load(reader);
- MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
- againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
- onTopic ( props.getProperty("topic") ).
- batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
- compress (Boolean.parseBoolean(props.getProperty("compress"))).
- httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
- build ();
- pub.setHost(props.getProperty("host"));
- if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-
- pub.setAuthKey(props.getProperty("authKey"));
- pub.setAuthDate(props.getProperty("authDate"));
- pub.setUsername(props.getProperty("username"));
- pub.setPassword(props.getProperty("password"));
- }else{
- pub.setUsername(props.getProperty("username"));
- pub.setPassword(props.getProperty("password"));
- }
- pub.setProducerFilePath(producerFilePath);
- pub.setProtocolFlag(props.getProperty("TransportType"));
- pub.setProps(props);
- routeFilePath=props.getProperty("DME2preferredRouterFilePath");
- routeReader= new FileReader(new File (routeFilePath));
- prop= new Properties();
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File(routeFilePath));
- }
- //pub.setContentType(contentType);
- return pub;
+ return createBatchingPublisher(props);
}
-
+
/**
- * Create a publisher that will contain send methods that return
- * response object to user.
- * @param producerFilePath set all properties for publishing message
+ * Create a publisher that will contain send methods that return response
+ * object to user.
+ *
+ * @param producerFilePath
+ * set all properties for publishing message
* @return MRBatchingPublisher obj
- * @throws FileNotFoundException exc
- * @throws IOException ioex
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
*/
- public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException {
- FileReader reader = new FileReader(new File (producerFilePath));
- Properties props = new Properties();
+ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(producerFilePath));
+ Properties props = new Properties();
props.load(reader);
- MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
- againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
- onTopic ( props.getProperty("topic") ).
- batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
- compress (Boolean.parseBoolean(props.getProperty("compress"))).
- httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
- withResponse(withResponse).
- build ();
+ return createBatchingPublisher(props, withResponse);
+ }
+
+ protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ assert props != null;
+ MRSimplerBatchPublisher pub;
+ if (withResponse) {
+ pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+ .onTopic(props.getProperty("topic"))
+ .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+ Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+ .compress(Boolean.parseBoolean(props.getProperty("compress")))
+ .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
+ .withResponse(withResponse).build();
+ } else {
+ pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+ .onTopic(props.getProperty("topic"))
+ .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+ Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+ .compress(Boolean.parseBoolean(props.getProperty("compress")))
+ .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
+ }
pub.setHost(props.getProperty("host"));
- if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-
+ if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+
pub.setAuthKey(props.getProperty("authKey"));
pub.setAuthDate(props.getProperty("authDate"));
pub.setUsername(props.getProperty("username"));
pub.setPassword(props.getProperty("password"));
- }else{
+ } else {
pub.setUsername(props.getProperty("username"));
pub.setPassword(props.getProperty("password"));
}
- pub.setProducerFilePath(producerFilePath);
pub.setProtocolFlag(props.getProperty("TransportType"));
pub.setProps(props);
- routeFilePath=props.getProperty("DME2preferredRouterFilePath");
- routeReader= new FileReader(new File (routeFilePath));
- prop= new Properties();
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File(routeFilePath));
+ routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+ routeReader = new FileReader(new File(routeFilePath));
+ prop = new Properties();
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
}
- //pub.setContentType(contentType);
return pub;
}
-
-
-
-
-
-
-
-
-
-
/**
* Create an identity manager client to work with API keys.
- * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param apiKey Your API key
- * @param apiSecret Your API secret
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
* @return an identity manager
*/
- public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret )
- {
+ public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
MRIdentityManager cim;
try {
- cim = new MRMetaClient ( hostSet );
+ cim = new MRMetaClient(hostSet);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
- cim.setApiCredentials ( apiKey, apiSecret );
+ cim.setApiCredentials(apiKey, apiSecret);
return cim;
}
/**
* Create a topic manager for working with topics.
- * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
- * @param apiKey Your API key
- * @param apiSecret Your API secret
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
* @return a topic manager
*/
- public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret )
- {
+ public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
MRMetaClient tmi;
try {
- tmi = new MRMetaClient ( hostSet );
+ tmi = new MRMetaClient(hostSet);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
- tmi.setApiCredentials ( apiKey, apiSecret );
+ tmi.setApiCredentials(apiKey, apiSecret);
return tmi;
}
/**
* Inject a consumer. Used to support unit tests.
+ *
* @param cc
*/
- public static void $testInject ( MRConsumer cc )
- {
+ public static void $testInject(MRConsumer cc) {
MRClientBuilders.sfConsumerMock = cc;
}
- public static MRConsumer createConsumer(String host, String topic, String username,
- String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) {
+ public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+ String id, int i, int j, String protocalFlag, String consumerFilePath) {
MRConsumerImpl sub;
try {
- sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
@@ -488,15 +592,15 @@ public class MRClientFactory
sub.setProtocolFlag(protocalFlag);
sub.setConsumerFilePath(consumerFilePath);
return sub;
-
+
}
-
- public static MRConsumer createConsumer(String host, String topic, String username,
- String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) {
+
+ public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+ String id, String protocalFlag, String consumerFilePath, int i, int j) {
MRConsumerImpl sub;
try {
- sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
@@ -506,52 +610,61 @@ public class MRClientFactory
sub.setProtocolFlag(protocalFlag);
sub.setConsumerFilePath(consumerFilePath);
return sub;
-
+
}
- public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException {
- FileReader reader = new FileReader(new File (consumerFilePath));
- Properties props = new Properties();
+ public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(consumerFilePath));
+ Properties props = new Properties();
props.load(reader);
+
+ return createConsumer(props);
+ }
+
+ public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
int timeout;
- if(props.getProperty("timeout")!=null)
- timeout=Integer.parseInt(props.getProperty("timeout"));
+ if (props.getProperty("timeout") != null)
+ timeout = Integer.parseInt(props.getProperty("timeout"));
else
- timeout=-1;
+ timeout = -1;
int limit;
- if(props.getProperty("limit")!=null)
- limit=Integer.parseInt(props.getProperty("limit"));
+ if (props.getProperty("limit") != null)
+ limit = Integer.parseInt(props.getProperty("limit"));
else
- limit=-1;
+ limit = -1;
String group;
- if(props.getProperty("group")==null)
- group=UUID.randomUUID ().toString();
+ if (props.getProperty("group") == null)
+ group = UUID.randomUUID().toString();
else
- group=props.getProperty("group");
- MRConsumerImpl sub=null;
- if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
- sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") );
+ group = props.getProperty("group");
+ MRConsumerImpl sub = null;
+ if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
+ group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+ props.getProperty("authKey"), props.getProperty("authDate"));
sub.setAuthKey(props.getProperty("authKey"));
sub.setAuthDate(props.getProperty("authDate"));
sub.setUsername(props.getProperty("username"));
sub.setPassword(props.getProperty("password"));
- }else{
- sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") );
+ } else {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
+ group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+ props.getProperty("username"), props.getProperty("password"));
sub.setUsername(props.getProperty("username"));
sub.setPassword(props.getProperty("password"));
}
sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
- sub.setProps(props);
+ sub.setProps(props);
sub.setHost(props.getProperty("host"));
sub.setProtocolFlag(props.getProperty("TransportType"));
- //sub.setConsumerFilePath(consumerFilePath);
+ // sub.setConsumerFilePath(consumerFilePath);
sub.setfFilter(props.getProperty("filter"));
- routeFilePath=props.getProperty("DME2preferredRouterFilePath");
- routeReader= new FileReader(new File (routeFilePath));
- prop= new Properties();
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File(routeFilePath));
+ routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+ routeReader = new FileReader(new File(routeFilePath));
+ prop = new Properties();
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
}
return sub;
}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
index 012e95e..999d7ef 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
@@ -50,286 +50,318 @@ import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
//import com.fasterxml.jackson.core.JsonProcessingException;
-public class MRBaseClient extends HttpClient implements MRClient
-{
-
+public class MRBaseClient extends HttpClient implements MRClient {
+
private static final String MR_AUTH_CONSTANT = "X-CambriaAuth";
private static final String MR_DATE_CONSTANT = "X-CambriaDate";
-
- protected MRBaseClient ( Collection<String> hosts ) throws MalformedURLException
- {
- super ( ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort );
-
- fLog = LoggerFactory.getLogger ( this.getClass().getName () );
- }
- protected MRBaseClient ( Collection<String> hosts, int stdSvcPort ) throws MalformedURLException {
- super ( ConnectionType.HTTP,hosts, stdSvcPort);
+ protected MRBaseClient(Collection<String> hosts) throws MalformedURLException {
+ super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort);
- fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+ fLog = LoggerFactory.getLogger(this.getClass().getName());
}
- protected MRBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
- {
- super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
+ protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException {
+ super(ConnectionType.HTTP, hosts, stdSvcPort);
- fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+ fLog = LoggerFactory.getLogger(this.getClass().getName());
}
+ protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException {
+ super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L,
+ TimeUnit.MILLISECONDS, 32, 32, 600000);
+
+ fLog = LoggerFactory.getLogger(this.getClass().getName());
+ }
@Override
- public void close ()
- {
+ public void close() {
}
- protected Set<String> jsonArrayToSet ( JSONArray a )
- {
- if ( a == null ) return null;
+ protected Set<String> jsonArrayToSet(JSONArray a) {
+ if (a == null)
+ return null;
- final TreeSet<String> set = new TreeSet<String> ();
- for ( int i=0; i<a.length (); i++ )
- {
- set.add ( a.getString ( i ));
+ final TreeSet<String> set = new TreeSet<String>();
+ for (int i = 0; i < a.length(); i++) {
+ set.add(a.getString(i));
}
return set;
}
- public void logTo ( Logger log )
- {
+ public void logTo(Logger log) {
fLog = log;
- replaceLogger ( log );
+ replaceLogger(log);
}
- protected Logger getLog ()
- {
+ protected Logger getLog() {
return fLog;
}
private Logger fLog;
-
- public JSONObject post(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+ public JSONObject post(final String path, final byte[] data, final String contentType, final String username,
+ final String password, final String protocolFlag) throws HttpException, JSONException {
if ((null != username && null != password)) {
WebTarget target = null;
Response response = null;
-
+
target = getTarget(path, username, password);
- String encoding = Base64.encodeAsString(username+":"+password);
-
-
- response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType));
-
+ String encoding = Base64.encodeAsString(username + ":" + password);
+
+ response = target.request().header("Authorization", "Basic " + encoding)
+ .post(Entity.entity(data, contentType));
+
return getResponseDataInJson(response);
} else {
- throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
}
}
- public String postWithResponse(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+ public String postWithResponse(final String path, final byte[] data, final String contentType,
+ final String username, final String password, final String protocolFlag)
+ throws HttpException, JSONException {
String responseData = null;
if ((null != username && null != password)) {
WebTarget target = null;
Response response = null;
-
+
target = getTarget(path, username, password);
- String encoding = Base64.encodeAsString(username+":"+password);
-
-
- response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType));
-
+ String encoding = Base64.encodeAsString(username + ":" + password);
+
+ response = target.request().header("Authorization", "Basic " + encoding)
+ .post(Entity.entity(data, contentType));
+
responseData = response.readEntity(String.class);
return responseData;
} else {
- throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
}
}
- public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+ public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,
+ final String authDate, final String username, final String password, final String protocolFlag)
+ throws HttpException, JSONException {
if ((null != username && null != password)) {
WebTarget target = null;
Response response = null;
- target= getTarget(path,username, password);
- response = target.request()
- .header(MR_AUTH_CONSTANT, authKey)
- .header(MR_DATE_CONSTANT, authDate)
- .post(Entity.entity(data, contentType));
-
+ target = getTarget(path, username, password);
+ response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate)
+ .post(Entity.entity(data, contentType));
+
return getResponseDataInJson(response);
} else {
- throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
}
}
- public String postAuthwithResponse(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+ public String postAuthwithResponse(final String path, final byte[] data, final String contentType,
+ final String authKey, final String authDate, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
String responseData = null;
if ((null != username && null != password)) {
WebTarget target = null;
Response response = null;
- target= getTarget(path,username, password);
- response = target.request()
- .header(MR_AUTH_CONSTANT, authKey)
- .header(MR_DATE_CONSTANT, authDate)
- .post(Entity.entity(data, contentType));
- responseData = response.readEntity(String.class);
- return responseData;
-
+ target = getTarget(path, username, password);
+ response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate)
+ .post(Entity.entity(data, contentType));
+ responseData = response.readEntity(String.class);
+ return responseData;
+
} else {
- throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
}
}
-
- public JSONObject get(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+ public JSONObject get(final String path, final String username, final String password, final String protocolFlag)
+ throws HttpException, JSONException {
if (null != username && null != password) {
-
+
WebTarget target = null;
Response response = null;
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- target=getTarget(path);
- response = target.request()
- .header(MR_AUTH_CONSTANT, username)
- .header(MR_DATE_CONSTANT, password)
- .get();
+ target = getTarget(path);
+ response = target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
} else {
target = getTarget(path, username, password);
- String encoding = Base64.encodeAsString(username+":"+password);
-
- response = target.request().header("Authorization", "Basic " + encoding).get();
-
+ String encoding = Base64.encodeAsString(username + ":" + password);
+
+ response = target.request().header("Authorization", "Basic " + encoding).get();
+
}
return getResponseDataInJson(response);
} else {
- throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
}
}
-
-
- public String getResponse(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+
+ public String getResponse(final String path, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
String responseData = null;
if (null != username && null != password) {
-
+
WebTarget target = null;
Response response = null;
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- target=getTarget(path);
- response = target.request()
- .header(MR_AUTH_CONSTANT, username)
- .header(MR_DATE_CONSTANT, password)
- .get();
+ target = getTarget(path);
+ response = target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
} else {
target = getTarget(path, username, password);
- String encoding = Base64.encodeAsString(username+":"+password);
- response = target.request().header("Authorization", "Basic " + encoding).get();
+ String encoding = Base64.encodeAsString(username + ":" + password);
+ response = target.request().header("Authorization", "Basic " + encoding).get();
}
- MRClientFactory.HTTPHeadersMap=response.getHeaders();
-
- String transactionid=response.getHeaderString("transactionid");
- if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
+ MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+ String transactionid = response.getHeaderString("transactionid");
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
}
-
+
responseData = response.readEntity(String.class);
return responseData;
} else {
- throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
}
}
-
- public JSONObject getAuth(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+
+ public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username,
+ final String password, final String protocolFlag) throws HttpException, JSONException {
if (null != username && null != password) {
-
+
WebTarget target = null;
Response response = null;
- target=getTarget(path, username, password);
- response = target.request()
- .header(MR_AUTH_CONSTANT, authKey)
- .header(MR_DATE_CONSTANT, authDate)
- .get();
-
+ target = getTarget(path, username, password);
+ response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate).get();
+
return getResponseDataInJson(response);
} else {
- throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
}
}
-
- public String getAuthResponse(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+
+ public JSONObject getNoAuth(final String path, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
+ if (null != username && null != password) {
+
+ WebTarget target = null;
+
+ Response response = null;
+ target = getTarget(path, username, password);
+ response = target.request().get();
+
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public String getAuthResponse(final String path, final String authKey, final String authDate, final String username,
+ final String password, final String protocolFlag) throws HttpException, JSONException {
String responseData = null;
if (null != username && null != password) {
-
+
WebTarget target = null;
Response response = null;
- target=getTarget(path, username, password);
- response = target.request()
- .header(MR_AUTH_CONSTANT, authKey)
- .header(MR_DATE_CONSTANT, authDate)
- .get();
-
- MRClientFactory.HTTPHeadersMap=response.getHeaders();
-
- String transactionid=response.getHeaderString("transactionid");
- if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
- }
-
- responseData = response.readEntity(String.class);
- return responseData;
+ target = getTarget(path, username, password);
+ response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate).get();
+
+ MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+ String transactionid = response.getHeaderString("transactionid");
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
+ }
+
+ responseData = response.readEntity(String.class);
+ return responseData;
} else {
- throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public String getNoAuthResponse(String path, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+
+ WebTarget target = null;
+
+ Response response = null;
+ target = getTarget(path, username, password);
+ response = target.request().get();
+
+ MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+ String transactionid = response.getHeaderString("transactionid");
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
}
+
+ responseData = response.readEntity(String.class);
+ return responseData;
+
}
private WebTarget getTarget(final String path, final String username, final String password) {
Client client = ClientBuilder.newClient();
-
- // Using UNIVERSAL as it supports both BASIC and DIGEST authentication types.
- HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
- client.register(feature);
-
+ // Using UNIVERSAL as it supports both BASIC and DIGEST authentication
+ // types.
+ HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
+ client.register(feature);
+
return client.target(path);
}
-
private WebTarget getTarget(final String path) {
Client client = ClientBuilder.newClient();
return client.target(path);
}
+
private JSONObject getResponseDataInJson(Response response) throws JSONException {
try {
- MRClientFactory.HTTPHeadersMap=response.getHeaders();
- // fLog.info("DMAAP response status: " + response.getStatus());
-
-
- //MultivaluedMap<String, Object> headersMap = response.getHeaders();
- //for(String key : headersMap.keySet()) {
- String transactionid=response.getHeaderString("transactionid");
- if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
+ MRClientFactory.HTTPHeadersMap = response.getHeaders();
+ // fLog.info("DMAAP response status: " + response.getStatus());
+
+ // MultivaluedMap<String, Object> headersMap =
+ // response.getHeaders();
+ // for(String key : headersMap.keySet()) {
+ String transactionid = response.getHeaderString("transactionid");
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
fLog.info("TransactionId : " + transactionid);
}
- /*final String responseData = response.readEntity(String.class);
- JSONTokener jsonTokener = new JSONTokener(responseData);
- JSONObject jsonObject = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if ('[' == firstChar) {
- JSONArray jsonArray = new JSONArray(jsonTokener);
- jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
- } else {
- jsonObject = new JSONObject(jsonTokener);
- }
-
- return jsonObject;*/
-
-
- if(response.getStatus()==403) {
+ /*
+ * final String responseData = response.readEntity(String.class);
+ * JSONTokener jsonTokener = new JSONTokener(responseData);
+ * JSONObject jsonObject = null; final char firstChar =
+ * jsonTokener.next(); jsonTokener.back(); if ('[' == firstChar) {
+ * JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject =
+ * new JSONObject(); jsonObject.put("result", jsonArray); } else {
+ * jsonObject = new JSONObject(jsonTokener); }
+ *
+ * return jsonObject;
+ */
+
+ if (response.getStatus() == 403) {
JSONObject jsonObject = null;
jsonObject = new JSONObject();
JSONArray jsonArray = new JSONArray();
@@ -339,11 +371,11 @@ public class MRBaseClient extends HttpClient implements MRClient
return jsonObject;
}
String responseData = response.readEntity(String.class);
-
+
JSONTokener jsonTokener = new JSONTokener(responseData);
JSONObject jsonObject = null;
final char firstChar = jsonTokener.next();
- jsonTokener.back();
+ jsonTokener.back();
if ('[' == firstChar) {
JSONArray jsonArray = new JSONArray(jsonTokener);
jsonObject = new JSONObject();
@@ -361,35 +393,35 @@ public class MRBaseClient extends HttpClient implements MRClient
}
}
-
- public String getHTTPErrorResponseMessage(String responseString){
-
+
+ public String getHTTPErrorResponseMessage(String responseString) {
+
String response = null;
int beginIndex = 0;
int endIndex = 0;
- if(responseString.contains("<body>")){
-
- beginIndex = responseString.indexOf("body>")+5;
+ if (responseString.contains("<body>")) {
+
+ beginIndex = responseString.indexOf("body>") + 5;
endIndex = responseString.indexOf("</body");
- response = responseString.substring(beginIndex,endIndex);
+ response = responseString.substring(beginIndex, endIndex);
}
-
+
return response;
-
+
}
-
- public String getHTTPErrorResponseCode(String responseString){
-
+
+ public String getHTTPErrorResponseCode(String responseString) {
+
String response = null;
int beginIndex = 0;
int endIndex = 0;
- if(responseString.contains("<title>")){
- beginIndex = responseString.indexOf("title>")+6;
+ if (responseString.contains("<title>")) {
+ beginIndex = responseString.indexOf("title>") + 6;
endIndex = responseString.indexOf("</title");
- response = responseString.substring(beginIndex,endIndex);
+ response = responseString.substring(beginIndex, endIndex);
}
-
- return response;
+
+ return response;
}
-
+
}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
index eb7fd91..78f37fc 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
@@ -46,46 +46,43 @@ import org.slf4j.LoggerFactory;
import com.att.aft.dme2.api.DME2Client;
import com.att.aft.dme2.api.DME2Exception;
+import com.att.nsa.mr.client.HostSelector;
import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.MRConsumer;
import com.att.nsa.mr.client.response.MRConsumerResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-public class MRConsumerImpl extends MRBaseClient implements MRConsumer
-{
-
+public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
+
private static final String SUCCESS_MESSAGE = "Success";
-
-
- private Logger log = LoggerFactory.getLogger ( this.getClass().getName () );
- public static List<String> stringToList ( String str )
- {
- final LinkedList<String> set = new LinkedList<String> ();
- if ( str != null )
- {
- final String[] parts = str.trim ().split ( "," );
- for ( String part : parts )
- {
+
+ private Logger log = LoggerFactory.getLogger(this.getClass().getName());
+
+ public static List<String> stringToList(String str) {
+ final LinkedList<String> set = new LinkedList<String>();
+ if (str != null) {
+ final String[] parts = str.trim().split(",");
+ for (String part : parts) {
final String trimmed = part.trim();
- if ( trimmed.length () > 0 )
- {
- set.add ( trimmed );
+ if (trimmed.length() > 0) {
+ set.add(trimmed);
}
}
}
return set;
}
-
- public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, String apiSecret_password ) throws MalformedURLException
- {
- this( hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, false );
- }
-
- public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, boolean allowSelfSignedCerts ) throws MalformedURLException
- {
- super ( hostPart, topic + "::" + consumerGroup + "::" + consumerId );
+
+ public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
+ String apiSecret_password) throws MalformedURLException {
+ this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
+ false);
+ }
+
+ public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
+ boolean allowSelfSignedCerts) throws MalformedURLException {
+ super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
fTopic = topic;
fGroup = consumerGroup;
@@ -94,233 +91,243 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
fLimit = limit;
fFilter = filter;
- //setApiCredentials ( apiKey, apiSecret );
+ fHostSelector = new HostSelector(hostPart);
}
@Override
- public Iterable<String> fetch () throws IOException,Exception
- {
+ public Iterable<String> fetch() throws IOException, Exception {
// fetch with the timeout and limit set in constructor
- return fetch ( fTimeoutMs, fLimit );
+ return fetch(fTimeoutMs, fLimit);
}
@Override
- public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException,Exception
- {
- final LinkedList<String> msgs = new LinkedList<String> ();
-
-// FIXME: the timeout on the socket needs to be at least as long as the long poll
-// // sanity check for long poll timeout vs. socket read timeout
-// final int maxReasonableTimeoutMs = CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
-// if ( timeoutMs > maxReasonableTimeoutMs )
-// {
-// log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. socket read timeout (" +
-// CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll timeout to " + maxReasonableTimeoutMs + "." );
-// timeoutMs = maxReasonableTimeoutMs;
-// }
-
- // final String urlPath = createUrlPath ( timeoutMs, limit );
-
- //getLog().info ( "UEB GET " + urlPath );
- try
- {
+ public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception {
+ final LinkedList<String> msgs = new LinkedList<String>();
+
+ // FIXME: the timeout on the socket needs to be at least as long as the
+ // long poll
+ // // sanity check for long poll timeout vs. socket read timeout
+ // final int maxReasonableTimeoutMs =
+ // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
+ // if ( timeoutMs > maxReasonableTimeoutMs )
+ // {
+ // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t.
+ // socket read timeout (" +
+ // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll
+ // timeout to " + maxReasonableTimeoutMs + "." );
+ // timeoutMs = maxReasonableTimeoutMs;
+ // }
+
+ // final String urlPath = createUrlPath ( timeoutMs, limit );
+
+ // getLog().info ( "UEB GET " + urlPath );
+ try {
if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
DMEConfigure(timeoutMs, limit);
- try
- {
- //getLog().info ( "Receiving msgs from: " + url+subContextPath );
- String reply = sender.sendAndWait(timeoutMs+10000L);
- // System.out.println("Message received = "+reply);
- final JSONObject o =getResponseDataInJson(reply);
- //msgs.add(reply);
- if ( o != null )
- {
- final JSONArray a = o.getJSONArray ( "result" );
- // final int b = o.getInt("status" );
- //if ( a != null && a.length()>0 )
- if ( a != null)
- {
- for ( int i=0; i<a.length (); i++ )
- {
- //msgs.add("DMAAP response status: "+Integer.toString(b));
- if (a.get(i) instanceof String)
- msgs.add ( a.getString(i) );
- else
- msgs.add ( a.getJSONObject(i).toString() );
-
-
+ try {
+ // getLog().info ( "Receiving msgs from: " +
+ // url+subContextPath );
+ String reply = sender.sendAndWait(timeoutMs + 10000L);
+ final JSONObject o = getResponseDataInJson(reply);
+ // msgs.add(reply);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ // final int b = o.getInt("status" );
+ // if ( a != null && a.length()>0 )
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ // msgs.add("DMAAP response status:
+ // "+Integer.toString(b));
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+
+ }
}
+ // else if(a != null && a.length()<1){
+ // msgs.add ("[]");
+ // }
}
-// else if(a != null && a.length()<1){
-// msgs.add ("[]");
-// }
- }
- }
- catch ( JSONException e )
- {
+ } catch (JSONException e) {
// unexpected response
- reportProblemWithResponse ();
- log.error("exception: ", e);
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
}
- catch ( HttpException e )
- {
- throw new IOException ( e );
- }
}
-
+
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId,props.getProperty("Protocol")), timeoutMs, limit );
-
-
- try
- {
- final JSONObject o = get ( urlPath, username, password, protocolFlag );
-
- if ( o != null )
- {
- final JSONArray a = o.getJSONArray ( "result" );
- final int b = o.getInt("status" );
- //if ( a != null && a.length()>0 )
- if ( a != null)
- {
- for ( int i=0; i<a.length (); i++ )
- {
- msgs.add("DMAAP response status: "+Integer.toString(b));
+ // final String urlPath = createUrlPath
+ // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup,
+ // fId,props.getProperty("Protocol")), timeoutMs, limit );
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ try {
+ final JSONObject o = get(urlPath, username, password, protocolFlag);
+
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ final int b = o.getInt("status");
+ // if ( a != null && a.length()>0 )
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ // msgs.add("DMAAP response status:
+ // "+Integer.toString(b));
if (a.get(i) instanceof String)
- msgs.add ( a.getString(i) );
+ msgs.add(a.getString(i));
else
- msgs.add ( a.getJSONObject(i).toString() );
-
+ msgs.add(a.getJSONObject(i).toString());
+
}
}
-// else if(a != null && a.length()<1)
-// {
-// msgs.add ("[]");
-// }
+ // else if(a != null && a.length()<1)
+ // {
+ // msgs.add ("[]");
+ // }
}
- }
- catch ( JSONException e )
- {
+ } catch (JSONException e) {
// unexpected response
- reportProblemWithResponse ();
- log.error("exception: ", e);
- }
- catch ( HttpException e )
- {
- throw new IOException ( e );
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
}
- }
-
+ }
+
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ,props.getProperty("Protocol")), timeoutMs, limit );
-
-
- try
- {
- final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag );
- if ( o != null )
- {
- final JSONArray a = o.getJSONArray ( "result" );
- final int b = o.getInt("status" );
- //if ( a != null && a.length()>0)
- if ( a != null)
- {
- for ( int i=0; i<a.length (); i++ )
- {
- msgs.add("DMAAP response status: "+Integer.toString(b));
- if (a.get(i) instanceof String)
- msgs.add ( a.getString(i) );
- else
- msgs.add ( a.getJSONObject(i).toString() );
-
+ final String urlPath = createUrlPath(
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+ timeoutMs, limit);
+
+ try {
+ final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ final int b = o.getInt("status");
+ // if ( a != null && a.length()>0)
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ // msgs.add("DMAAP response status:
+ // "+Integer.toString(b));
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+
+ }
}
+ // else if(a != null && a.length()<1){
+ // msgs.add ("[]");
+ // }
}
-// else if(a != null && a.length()<1){
-// msgs.add ("[]");
-// }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
}
+
}
- catch ( JSONException e )
- {
- // unexpected response
- reportProblemWithResponse ();
- log.error("exception: ", e);
- }
- catch ( HttpException e )
- {
- throw new IOException ( e );
- }
-
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ try {
+ final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ final int b = o.getInt("status");
+ // if ( a != null && a.length()>0)
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ // msgs.add("DMAAP response status:
+ // "+Integer.toString(b));
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+
+ }
+ }
+
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+
}
-
- } catch ( JSONException e ) {
+
+ } catch (JSONException e) {
// unexpected response
- reportProblemWithResponse ();
- log.error("exception: ", e);
+ reportProblemWithResponse();
+ log.error("exception: ", e);
} catch (HttpException e) {
throw new IOException(e);
- } catch (Exception e ) {
+ } catch (Exception e) {
throw e;
}
-
return msgs;
}
private JSONObject getResponseDataInJson(String response) {
- try {
-
-
- //log.info("DMAAP response status: " + response.getStatus());
+ try {
+
+ // log.info("DMAAP response status: " + response.getStatus());
- // final String responseData = response.readEntity(String.class);
+ // final String responseData = response.readEntity(String.class);
+ JSONTokener jsonTokener = new JSONTokener(response);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ }
+
+ return jsonObject;
+ } catch (JSONException excp) {
+ // log.error("DMAAP - Error reading response data.", excp);
+ return null;
+ }
+
+ }
+
+ private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
JSONTokener jsonTokener = new JSONTokener(response);
JSONObject jsonObject = null;
final char firstChar = jsonTokener.next();
- jsonTokener.back();
+ jsonTokener.back();
+ if (null != response && response.length() == 0) {
+ return null;
+ }
+
if ('[' == firstChar) {
JSONArray jsonArray = new JSONArray(jsonTokener);
jsonObject = new JSONObject();
jsonObject.put("result", jsonArray);
+ } else if ('{' == firstChar) {
+ return null;
+ } else if ('<' == firstChar) {
+ return null;
} else {
jsonObject = new JSONObject(jsonTokener);
}
return jsonObject;
- } catch (JSONException excp) {
- // log.error("DMAAP - Error reading response data.", excp);
- return null;
- }
-
-
-
-}
-
- private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
- JSONTokener jsonTokener = new JSONTokener(response);
- JSONObject jsonObject = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if(null != response && response.length()==0){
- return null;
- }
-
- if ('[' == firstChar) {
- JSONArray jsonArray = new JSONArray(jsonTokener);
- jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
- } else if('{' == firstChar){
- return null;
- } else if('<' == firstChar){
- return null;
- }else{
- jsonObject = new JSONObject(jsonTokener);
- }
- return jsonObject;
-
}
-
+
private final String fTopic;
private final String fGroup;
private final String fId;
@@ -330,187 +337,184 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
private String username;
private String password;
private String host;
- private String latitude;
- private String longitude;
- private String version;
- private String serviceName;
- private String env;
- private String partner;
- private String routeOffer;
- private String subContextPath;
- private String protocol;
- private String methodType;
- private String url;
- private String dmeuser;
- private String dmepassword;
- private String contenttype;
- private DME2Client sender;
- public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
- public String consumerFilePath;
- private String authKey;
- private String authDate;
- private Properties props;
- private HashMap<String, String> DMETimeOuts;
- private String handlers;
- public static final String routerFilePath = null;
- public static String getRouterFilePath() {
- return routerFilePath;
- }
-
- public static void setRouterFilePath(String routerFilePath) {
- MRSimplerBatchPublisher.routerFilePath = routerFilePath;
- }
- public String getConsumerFilePath() {
- return consumerFilePath;
- }
+ HostSelector fHostSelector = null;
+ private String latitude;
+ private String longitude;
+ private String version;
+ private String serviceName;
+ private String env;
+ private String partner;
+ private String routeOffer;
+ private String subContextPath;
+ private String protocol;
+ private String methodType;
+ private String url;
+ private String dmeuser;
+ private String dmepassword;
+ private String contenttype;
+ private DME2Client sender;
+ public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+ public String consumerFilePath;
+ private String authKey;
+ private String authDate;
+ private Properties props;
+ private HashMap<String, String> DMETimeOuts;
+ private String handlers;
+ public static final String routerFilePath = null;
+
+ public static String getRouterFilePath() {
+ return routerFilePath;
+ }
- public void setConsumerFilePath(String consumerFilePath) {
- this.consumerFilePath = consumerFilePath;
- }
+ public static void setRouterFilePath(String routerFilePath) {
+ MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+ }
- public String getProtocolFlag() {
- return protocolFlag;
- }
+ public String getConsumerFilePath() {
+ return consumerFilePath;
+ }
+
+ public void setConsumerFilePath(String consumerFilePath) {
+ this.consumerFilePath = consumerFilePath;
+ }
- public void setProtocolFlag(String protocolFlag) {
- this.protocolFlag = protocolFlag;
+ public String getProtocolFlag() {
+ return protocolFlag;
+ }
+
+ public void setProtocolFlag(String protocolFlag) {
+ this.protocolFlag = protocolFlag;
+ }
+
+ private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
+ latitude = props.getProperty("Latitude");
+ longitude = props.getProperty("Longitude");
+ version = props.getProperty("Version");
+ serviceName = props.getProperty("ServiceName");
+ env = props.getProperty("Environment");
+ partner = props.getProperty("Partner");
+ routeOffer = props.getProperty("routeOffer");
+
+ subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
+ // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
+ // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath,
+ // timeoutMs);
+
+ protocol = props.getProperty("Protocol");
+ methodType = props.getProperty("MethodType");
+ dmeuser = props.getProperty("username");
+ dmepassword = props.getProperty("password");
+ contenttype = props.getProperty("contenttype");
+ handlers = props.getProperty("sessionstickinessrequired");
+ // url =protocol+"://DME2SEARCH/"+
+ // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
+ // url = protocol +
+ // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
+
+ /**
+ * Changes to DME2Client url to use Partner for auto failover between
+ * data centers When Partner value is not provided use the routeOffer
+ * value for auto failover within a cluster
+ */
+
+ String preferredRouteKey = readRoute("preferredRouteKey");
+
+ if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
+ + "&routeoffer=" + preferredRouteKey;
+ } else if (partner != null && !partner.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
+ } else if (routeOffer != null && !routeOffer.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+ + routeOffer;
}
-
- private void DMEConfigure(int timeoutMs, int limit)throws IOException,DME2Exception, URISyntaxException{
- latitude = props.getProperty("Latitude");
- longitude = props.getProperty("Longitude");
- version = props.getProperty("Version");
- serviceName = props.getProperty("ServiceName");
- env = props.getProperty("Environment");
- partner = props.getProperty("Partner");
- routeOffer = props.getProperty("routeOffer");
-
- subContextPath=props.getProperty("SubContextPath")+fTopic+"/"+fGroup+"/"+fId;
- // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
- //if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, timeoutMs);
-
- protocol = props.getProperty("Protocol");
- methodType = props.getProperty("MethodType");
- dmeuser = props.getProperty("username");
- dmepassword = props.getProperty("password");
- contenttype = props.getProperty("contenttype");
- handlers = props.getProperty("sessionstickinessrequired");
- //url =protocol+"://DME2SEARCH/"+ "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
- // url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
-
- /**
- * Changes to DME2Client url to use Partner for auto failover between data centers
- * When Partner value is not provided use the routeOffer value for auto failover within a cluster
- */
-
- String preferredRouteKey = readRoute("preferredRouteKey");
-
- if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty())
- {
- url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner+"&routeoffer="+preferredRouteKey;
- }else if (partner != null && !partner.isEmpty())
- {
- url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner;
- }
- else if (routeOffer!=null && !routeOffer.isEmpty())
- {
- url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
- }
-
- //log.info("url :"+url);
-
- if(timeoutMs != -1 )url=url+"&timeout="+timeoutMs;
- if(limit != -1 )url=url+"&limit="+limit;
-
- DMETimeOuts = new HashMap<String, String>();
- DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
- DMETimeOuts.put("Content-Type", contenttype);
- System.setProperty("AFT_LATITUDE", latitude);
- System.setProperty("AFT_LONGITUDE", longitude);
- System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
- // System.setProperty("DME2.DEBUG", "true");
-
- //SSL changes
- System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
- "SSLv3,TLSv1,TLSv1.1");
- System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
- System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
- //SSL changes
-
- sender = new DME2Client(new URI(url), timeoutMs+10000L);
- sender.setAllowAllHttpReturnCodes(true);
- sender.setMethod(methodType);
- sender.setSubContext(subContextPath);
- if(dmeuser != null && dmepassword != null){
+
+ // log.info("url :"+url);
+
+ if (timeoutMs != -1)
+ url = url + "&timeout=" + timeoutMs;
+ if (limit != -1)
+ url = url + "&limit=" + limit;
+
+ // Add filter to DME2 Url
+ if (fFilter != null && fFilter.length() > 0)
+ url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
+
+ DMETimeOuts = new HashMap<String, String>();
+ DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ DMETimeOuts.put("Content-Type", contenttype);
+ System.setProperty("AFT_LATITUDE", latitude);
+ System.setProperty("AFT_LONGITUDE", longitude);
+ System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+ // System.setProperty("DME2.DEBUG", "true");
+
+ // SSL changes
+ // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+ // "SSLv3,TLSv1,TLSv1.1");
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
+ System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+ System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+ // SSL changes
+
+ sender = new DME2Client(new URI(url), timeoutMs + 10000L);
+ sender.setAllowAllHttpReturnCodes(true);
+ sender.setMethod(methodType);
+ sender.setSubContext(subContextPath);
+ if (dmeuser != null && dmepassword != null) {
sender.setCredentials(dmeuser, dmepassword);
- //System.out.println(dmepassword);
- }
- sender.setHeaders(DMETimeOuts);
- sender.setPayload("");
-
- if(handlers.equalsIgnoreCase("yes")){
- sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
- sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
- }else{
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
- }
- /* HeaderReplyHandler headerhandler= new HeaderReplyHandler();
- sender.setReplyHandler(headerhandler);*/
-// } catch (DME2Exception x) {
-// getLog().warn(x.getMessage(), x);
-// System.out.println("XXXXXXXXXXXX"+x);
-// } catch (URISyntaxException x) {
-// System.out.println(x);
-// getLog().warn(x.getMessage(), x);
-// } catch (Exception x) {
-// System.out.println("XXXXXXXXXXXX"+x);
-// getLog().warn(x.getMessage(), x);
-// }
}
- public Properties getProps() {
- return props;
+ sender.setHeaders(DMETimeOuts);
+ sender.setPayload("");
+
+ if (handlers.equalsIgnoreCase("yes")) {
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+ props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+ sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+ } else {
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
}
+ }
- public void setProps(Properties props) {
- this.props = props;
- }
+ public Properties getProps() {
+ return props;
+ }
+
+ public void setProps(Properties props) {
+ this.props = props;
+ }
- protected String createUrlPath (String url, int timeoutMs , int limit ) throws IOException
- {
- final StringBuffer contexturl= new StringBuffer(url);
- // final StringBuffer url = new StringBuffer ( CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
- final StringBuffer adds = new StringBuffer ();
- if ( timeoutMs > -1 ) adds.append ( "timeout=" ).append ( timeoutMs );
- if ( limit > -1 )
- {
- if ( adds.length () > 0 )
- {
- adds.append ( "&" );
+ protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
+ final StringBuffer contexturl = new StringBuffer(url);
+ // final StringBuffer url = new StringBuffer (
+ // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
+ final StringBuffer adds = new StringBuffer();
+ if (timeoutMs > -1)
+ adds.append("timeout=").append(timeoutMs);
+ if (limit > -1) {
+ if (adds.length() > 0) {
+ adds.append("&");
}
- adds.append ( "limit=" ).append ( limit );
+ adds.append("limit=").append(limit);
}
- if ( fFilter != null && fFilter.length () > 0 )
- {
+ if (fFilter != null && fFilter.length() > 0) {
try {
- if ( adds.length () > 0 )
- {
- adds.append ( "&" );
+ if (adds.length() > 0) {
+ adds.append("&");
}
adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
}
}
- if ( adds.length () > 0 )
- {
- contexturl.append ( "?" ).append ( adds.toString () );
+ if (adds.length() > 0) {
+ contexturl.append("?").append(adds.toString());
}
-
- //sender.setSubContext(url.toString());
- return contexturl.toString ();
+
+ // sender.setSubContext(url.toString());
+ return contexturl.toString();
}
public String getUsername() {
@@ -560,20 +564,20 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
public void setfFilter(String fFilter) {
this.fFilter = fFilter;
}
-
+
private String readRoute(String routeKey) {
try {
-
- MRClientFactory.prop.load(new FileReader(new File (MRClientFactory.routeFilePath)));
+
+ MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
} catch (Exception ex) {
- log.error("Reply Router Error " + ex.toString() );
+ log.error("Reply Router Error " + ex.toString());
}
- String routeOffer = MRClientFactory.prop.getProperty(routeKey);
+ String routeOffer = MRClientFactory.prop.getProperty(routeKey);
return routeOffer;
}
-
+
@Override
public MRConsumerResponse fetchWithReturnConsumerResponse() {
@@ -582,13 +586,11 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
}
@Override
- public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs,
- int limit) {
+ public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
final LinkedList<String> msgs = new LinkedList<String>();
MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
try {
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(
- protocolFlag)) {
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
DMEConfigure(timeoutMs, limit);
String reply = sender.sendAndWait(timeoutMs + 10000L);
@@ -599,7 +601,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
final JSONArray a = o.getJSONArray("result");
if (a != null) {
- for (int i = 0; i < a.length(); i++) {
+ for (int i = 0; i < a.length(); i++) {
if (a.get(i) instanceof String)
msgs.add(a.getString(i));
else
@@ -612,15 +614,16 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
createMRConsumerResponse(reply, mrConsumerResponse);
}
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(
- protocolFlag)) {
- final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
- props.getProperty("Protocol")), timeoutMs,
- limit);
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ /*
+ * final String urlPath = createUrlPath(
+ * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
+ * props.getProperty("Protocol")), timeoutMs, limit);
+ */
- String response = getResponse(urlPath, username, password,
- protocolFlag);
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+ String response = getResponse(urlPath, username, password, protocolFlag);
final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
@@ -628,7 +631,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
final JSONArray a = o.getJSONArray("result");
if (a != null) {
- for (int i = 0; i < a.length(); i++) {
+ for (int i = 0; i < a.length(); i++) {
if (a.get(i) instanceof String)
msgs.add(a.getString(i));
else
@@ -641,16 +644,13 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
createMRConsumerResponse(response, mrConsumerResponse);
}
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(
- protocolFlag)) {
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
- props.getProperty("Protocol")), timeoutMs,
- limit);
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+ timeoutMs, limit);
- String response = getAuthResponse(urlPath, authKey, authDate,
- username, password, protocolFlag);
- final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+ String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
if (o != null) {
final JSONArray a = o.getJSONArray("result");
@@ -667,51 +667,74 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
}
createMRConsumerResponse(response, mrConsumerResponse);
}
-
-
-
- } catch (JSONException e) {
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ // final String urlPath = createUrlPath(
+ // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
+ // props.getProperty("Protocol")), timeoutMs,
+ // limit);
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+
+ }
+ }
+
+ }
+ createMRConsumerResponse(response, mrConsumerResponse);
+ }
+
+ } catch (JSONException e) {
mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("json exception: ", e);
- } catch (HttpException e) {
+ log.error("json exception: ", e);
+ } catch (HttpException e) {
mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("http exception: ", e);
- }catch(DME2Exception e){
+ log.error("http exception: ", e);
+ } catch (DME2Exception e) {
mrConsumerResponse.setResponseCode(e.getErrorCode());
mrConsumerResponse.setResponseMessage(e.getErrorMessage());
- log.error("DME2 exception: ", e);
- }catch (Exception e) {
+ log.error("DME2 exception: ", e);
+ } catch (Exception e) {
mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("exception: ", e);
+ log.error("exception: ", e);
}
mrConsumerResponse.setActualMessages(msgs);
return mrConsumerResponse;
}
private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
-
- if(reply.startsWith("{")){
+
+ if (reply.startsWith("{")) {
JSONObject jObject = new JSONObject(reply);
String message = jObject.getString("message");
int status = jObject.getInt("status");
-
+
mrConsumerResponse.setResponseCode(Integer.toString(status));
-
- if(null != message){
- mrConsumerResponse.setResponseMessage(message);
- }
- }else if (reply.startsWith("<")){
+
+ if (null != message) {
+ mrConsumerResponse.setResponseMessage(message);
+ }
+ } else if (reply.startsWith("<")) {
mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
- mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
- }else{
+ mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ } else {
mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
- mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
+ mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
}
-
+
}
-
}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
index 398558d..db982ec 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
@@ -47,6 +47,7 @@ import org.apache.http.HttpException;
import org.apache.http.HttpStatus;
import org.json.JSONArray;
import org.json.JSONObject;
+import org.json.JSONTokener;
import com.att.aft.dme2.api.DME2Client;
import com.att.aft.dme2.api.DME2Exception;
@@ -55,76 +56,66 @@ import com.att.nsa.mr.client.MRBatchingPublisher;
import com.att.nsa.mr.client.response.MRPublisherResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
-{
+public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
- public static class Builder
- {
- public Builder ()
- {
+ public static class Builder {
+ public Builder() {
}
- public Builder againstUrls ( Collection<String> baseUrls )
- {
+ public Builder againstUrls(Collection<String> baseUrls) {
fUrls = baseUrls;
return this;
}
- public Builder onTopic ( String topic )
- {
+ public Builder onTopic(String topic) {
fTopic = topic;
return this;
}
- public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
- {
+ public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
fMaxBatchSize = maxBatchSize;
fMaxBatchAgeMs = maxBatchAgeMs;
return this;
}
- public Builder compress ( boolean compress )
- {
+ public Builder compress(boolean compress) {
fCompress = compress;
return this;
}
-
- public Builder httpThreadTime ( int threadOccuranceTime )
- {
+
+ public Builder httpThreadTime(int threadOccuranceTime) {
this.threadOccuranceTime = threadOccuranceTime;
return this;
}
-
- public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
- {
+
+ public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
fAllowSelfSignedCerts = allowSelfSignedCerts;
return this;
}
-
- public Builder withResponse ( boolean withResponse)
- {
+
+ public Builder withResponse(boolean withResponse) {
fWithResponse = withResponse;
return this;
}
- public MRSimplerBatchPublisher build ()
- {
- if(!fWithResponse)
- {
+
+ public MRSimplerBatchPublisher build() {
+ if (!fWithResponse) {
try {
- return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
+ return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+ fAllowSelfSignedCerts, threadOccuranceTime);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
- } else
- {
+ } else {
try {
- return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
+ return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+ fAllowSelfSignedCerts, fMaxBatchSize);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}
-
+
}
private Collection<String> fUrls;
@@ -135,262 +126,250 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
private int threadOccuranceTime = 50;
private boolean fAllowSelfSignedCerts = false;
private boolean fWithResponse = false;
-
+
};
@Override
- public int send ( String partition, String msg )
- {
- return send ( new message ( partition, msg ) );
+ public int send(String partition, String msg) {
+ return send(new message(partition, msg));
}
+
@Override
- public int send ( String msg )
- {
- return send ( new message ( null, msg ) );
+ public int send(String msg) {
+ return send(new message(null, msg));
}
-
@Override
- public int send ( message msg )
- {
- final LinkedList<message> list = new LinkedList<message> ();
- list.add ( msg );
- return send ( list );
+ public int send(message msg) {
+ final LinkedList<message> list = new LinkedList<message>();
+ list.add(msg);
+ return send(list);
}
-
-
@Override
- public synchronized int send ( Collection<message> msgs )
- {
- if ( fClosed )
- {
- throw new IllegalStateException ( "The publisher was closed." );
+ public synchronized int send(Collection<message> msgs) {
+ if (fClosed) {
+ throw new IllegalStateException("The publisher was closed.");
}
-
- for ( message userMsg : msgs )
- {
- fPending.add ( new TimestampedMessage ( userMsg ) );
+
+ for (message userMsg : msgs) {
+ fPending.add(new TimestampedMessage(userMsg));
}
- return getPendingMessageCount ();
+ return getPendingMessageCount();
}
@Override
- public synchronized int getPendingMessageCount ()
- {
- return fPending.size ();
+ public synchronized int getPendingMessageCount() {
+ return fPending.size();
}
@Override
- public void close ()
- {
- try
- {
- final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
- if ( remains.size() > 0 )
- {
- getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
- + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
+ public void close() {
+ try {
+ final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ if (remains.size() > 0) {
+ getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+ + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
}
- }
- catch ( InterruptedException e )
- {
- getLog().warn ( "Possible message loss. " + e.getMessage(), e );
- }
- catch ( IOException e )
- {
- getLog().warn ( "Possible message loss. " + e.getMessage(), e );
+ } catch (InterruptedException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ } catch (IOException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
}
}
@Override
- public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
- {
- synchronized ( this )
- {
+ public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
+ synchronized (this) {
fClosed = true;
// stop the background sender
- fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
- fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
- fExec.shutdown ();
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ fExec.shutdown();
}
- final long now = Clock.now ();
- final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
+ final long now = Clock.now();
+ final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
final long timeoutAtMs = now + waitInMs;
- while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
- {
- send ( true );
- Thread.sleep ( 250 );
+ while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
+ send(true);
+ Thread.sleep(250);
}
- synchronized ( this )
- {
- final LinkedList<message> result = new LinkedList<message> ();
- fPending.drainTo ( result );
+ synchronized (this) {
+ final LinkedList<message> result = new LinkedList<message>();
+ fPending.drainTo(result);
return result;
}
}
/**
- * Possibly send a batch to the MR server. This is called by the background thread
- * and the close() method
+ * Possibly send a batch to the MR server. This is called by the background
+ * thread and the close() method
*
* @param force
*/
- private synchronized void send ( boolean force )
- {
- if ( force || shouldSendNow () )
- {
- if ( !sendBatch () )
- {
- getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+ private synchronized void send(boolean force) {
+ if (force || shouldSendNow()) {
+ if (!sendBatch()) {
+ getLog().warn("Send failed, " + fPending.size() + " message to send.");
// note the time for back-off
- fDontSendUntilMs = sfWaitAfterError + Clock.now ();
+ fDontSendUntilMs = sfWaitAfterError + Clock.now();
}
}
}
- private synchronized boolean shouldSendNow ()
- {
+ private synchronized boolean shouldSendNow() {
boolean shouldSend = false;
- if ( fPending.size () > 0 )
- {
- final long nowMs = Clock.now ();
+ if (fPending.size() > 0) {
+ final long nowMs = Clock.now();
- shouldSend = ( fPending.size() >= fMaxBatchSize );
- if ( !shouldSend )
- {
+ shouldSend = (fPending.size() >= fMaxBatchSize);
+ if (!shouldSend) {
final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
shouldSend = sendAtMs <= nowMs;
}
// however, wait after an error
- shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
+ shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
}
return shouldSend;
}
- private synchronized boolean sendBatch ()
- {
- // it's possible for this call to be made with an empty list. in this case, just return.
- if ( fPending.size() < 1 )
- {
+ /**
+ * Method to parse published JSON Objects and Arrays
+ *
+ * @return JSONArray
+ */
+ private JSONArray parseJSON() {
+ JSONArray jsonArray = new JSONArray();
+ for (TimestampedMessage m : fPending) {
+ JSONTokener jsonTokener = new JSONTokener(m.fMsg);
+ JSONObject jsonObject = null;
+ JSONArray tempjsonArray = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ tempjsonArray = new JSONArray(jsonTokener);
+ if (null != tempjsonArray) {
+ for (int i = 0; i < tempjsonArray.length(); i++) {
+ jsonArray.put(tempjsonArray.getJSONObject(i));
+ }
+ }
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ jsonArray.put(jsonObject);
+ }
+
+ }
+ return jsonArray;
+ }
+
+ private synchronized boolean sendBatch() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.size() < 1) {
return true;
}
- final long nowMs = Clock.now ();
-
+ final long nowMs = Clock.now();
+
host = this.fHostSelector.selectBaseHost();
-
- final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
-
-
- try
- {
- /*final String contentType =
- fCompress ?
- MRFormat.CAMBRIA_ZIP.toString () :
- MRFormat.CAMBRIA.toString ()
- ;*/
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+ props.getProperty("partition"));
+
+ try {
+ /*
+ * final String contentType = fCompress ?
+ * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
+ */
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
OutputStream os = baseStream;
final String contentType = props.getProperty("contenttype");
- if(contentType.equalsIgnoreCase("application/json")){
- JSONArray jsonArray = new JSONArray();
- for ( TimestampedMessage m : fPending )
- {
- JSONObject jsonObject = new JSONObject(m.fMsg);
-
- jsonArray.put(jsonObject);
-
+ if (contentType.equalsIgnoreCase("application/json")) {
+ JSONArray jsonArray = parseJSON();
+ os.write(jsonArray.toString().getBytes());
+ os.close();
+
+ } else if (contentType.equalsIgnoreCase("text/plain")) {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+ } else if (contentType.equalsIgnoreCase("application/cambria")
+ || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+ if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
}
- os.write (jsonArray.toString().getBytes() );
os.close();
+ } else {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
- }else if (contentType.equalsIgnoreCase("text/plain")){
- for ( TimestampedMessage m : fPending )
- {
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- os.close ();
- } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
- if ( contentType.equalsIgnoreCase("application/cambria-zip") )
- {
- os = new GZIPOutputStream ( baseStream );
- }
- for ( TimestampedMessage m : fPending )
- {
-
- os.write ( ( "" + m.fPartition.length () ).getBytes() );
- os.write ( '.' );
- os.write ( ( "" + m.fMsg.length () ).getBytes() );
- os.write ( '.' );
- os.write ( m.fPartition.getBytes() );
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- os.close ();
- }else{
- for ( TimestampedMessage m : fPending )
- {
- os.write ( m.fMsg.getBytes() );
-
- }
- os.close ();
}
-
-
+ os.close();
+ }
- final long startMs = Clock.now ();
+ final long startMs = Clock.now();
if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-
-
+
DME2Configue();
-
+
Thread.sleep(5);
- getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- sender.setPayload(os.toString());
+ getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ sender.setPayload(os.toString());
String dmeResponse = sender.sendAndWait(5000L);
-
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
- + dmeResponse.toString();
+
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
getLog().info(logLine);
fPending.clear();
return true;
- }
-
+ }
+
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
- //System.out.println(result.getInt("status"));
- //Here we are checking for error response. If HTTP status
- //code is not within the http success response code
- //then we consider this as error and return false
- if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
+ username, password, protocolFlag);
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt("status") < 200 || result.getInt("status") > 299) {
return false;
}
final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
getLog().info(logLine);
fPending.clear();
return true;
- }
-
+ }
+
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-
-
- //System.out.println(result.getInt("status"));
- //Here we are checking for error response. If HTTP status
- //code is not within the http success response code
- //then we consider this as error and return false
- if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
+ protocolFlag);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt("status") < 200 || result.getInt("status") > 299) {
return false;
}
final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
@@ -398,118 +377,100 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
fPending.clear();
return true;
}
- }
- catch ( IllegalArgumentException x ) {
- getLog().warn ( x.getMessage(), x );
- } catch ( IOException x ) {
- getLog().warn ( x.getMessage(), x );
+ } catch (IllegalArgumentException x) {
+ getLog().warn(x.getMessage(), x);
+ } catch (IOException x) {
+ getLog().warn(x.getMessage(), x);
} catch (HttpException x) {
- getLog().warn ( x.getMessage(), x );
+ getLog().warn(x.getMessage(), x);
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
}
return false;
}
- public synchronized MRPublisherResponse sendBatchWithResponse ()
- {
- // it's possible for this call to be made with an empty list. in this case, just return.
- if ( fPending.size() < 1 )
- {
+ public synchronized MRPublisherResponse sendBatchWithResponse() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.size() < 1) {
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage("No Messages to send");
return pubResponse;
}
- final long nowMs = Clock.now ();
-
+ final long nowMs = Clock.now();
+
host = this.fHostSelector.selectBaseHost();
-
- final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
- OutputStream os=null;
- try
- {
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
- os = baseStream;
+
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+ props.getProperty("partition"));
+ OutputStream os = null;
+ try {
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+ os = baseStream;
final String contentType = props.getProperty("contenttype");
- if(contentType.equalsIgnoreCase("application/json")){
- JSONArray jsonArray = new JSONArray();
- for ( TimestampedMessage m : fPending )
- {
- JSONObject jsonObject = new JSONObject(m.fMsg);
-
- jsonArray.put(jsonObject);
-
+ if (contentType.equalsIgnoreCase("application/json")) {
+ JSONArray jsonArray = parseJSON();
+ os.write(jsonArray.toString().getBytes());
+ } else if (contentType.equalsIgnoreCase("text/plain")) {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
}
- os.write (jsonArray.toString().getBytes() );
- }else if (contentType.equalsIgnoreCase("text/plain")){
- for ( TimestampedMessage m : fPending )
- {
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
- if ( contentType.equalsIgnoreCase("application/cambria-zip") )
- {
- os = new GZIPOutputStream ( baseStream );
- }
- for ( TimestampedMessage m : fPending )
- {
-
- os.write ( ( "" + m.fPartition.length () ).getBytes() );
- os.write ( '.' );
- os.write ( ( "" + m.fMsg.length () ).getBytes() );
- os.write ( '.' );
- os.write ( m.fPartition.getBytes() );
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- os.close ();
- }else{
- for ( TimestampedMessage m : fPending )
- {
- os.write ( m.fMsg.getBytes() );
-
- }
+ } else if (contentType.equalsIgnoreCase("application/cambria")
+ || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+ if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
}
-
-
+ os.close();
+ } else {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
- final long startMs = Clock.now ();
+ }
+ }
+
+ final long startMs = Clock.now();
if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-
-
+
try {
- DME2Configue();
-
- Thread.sleep(5);
- getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- sender.setPayload(os.toString());
-
-
- String dmeResponse = sender.sendAndWait(5000L);
- System.out.println("dmeres->"+dmeResponse);
-
-
- pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
-
- if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
- final String logLine = String.valueOf((Clock.now() - startMs))
- + dmeResponse.toString();
- getLog().info(logLine);
- fPending.clear();
-
- }
- catch (DME2Exception x) {
+ DME2Configue();
+
+ Thread.sleep(5);
+ getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ sender.setPayload(os.toString());
+
+ String dmeResponse = sender.sendAndWait(5000L);
+
+ pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+ final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
+ getLog().info(logLine);
+ fPending.clear();
+
+ } catch (DME2Exception x) {
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(x.getErrorCode());
pubResponse.setResponseMessage(x.getErrorMessage());
} catch (URISyntaxException x) {
-
+
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage(x.getMessage());
@@ -517,135 +478,127 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage(x.getMessage());
- logger.error("exception: ", x);
-
+ logger.error("exception: ", x);
+
}
-
+
return pubResponse;
- }
-
+ }
+
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
- //System.out.println(result.getInt("status"));
- //Here we are checking for error response. If HTTP status
- //code is not within the http success response code
- //then we consider this as error and return false
-
-
- pubResponse = createMRPublisherResponse(result,pubResponse);
-
- if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
+ authDate, username, password, protocolFlag);
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
return pubResponse;
}
-
+
final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
getLog().info(logLine);
fPending.clear();
return pubResponse;
- }
-
+ }
+
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-
- //System.out.println(result.getInt("status"));
- //Here we are checking for error response. If HTTP status
- //code is not within the http success response code
- //then we consider this as error and return false
- pubResponse = createMRPublisherResponse(result,pubResponse);
-
- if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
+ password, protocolFlag);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
return pubResponse;
}
-
+
final String logLine = String.valueOf((Clock.now() - startMs));
getLog().info(logLine);
fPending.clear();
return pubResponse;
}
- }
- catch ( IllegalArgumentException x ) {
- getLog().warn ( x.getMessage(), x );
+ } catch (IllegalArgumentException x) {
+ getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage(x.getMessage());
-
- } catch ( IOException x ) {
- getLog().warn ( x.getMessage(), x );
+
+ } catch (IOException x) {
+ getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage(x.getMessage());
-
+
} catch (HttpException x) {
- getLog().warn ( x.getMessage(), x );
+ getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage(x.getMessage());
-
+
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
-
+
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage(x.getMessage());
-
+
}
-
+
finally {
- if (fPending.size()>0) {
- getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+ if (fPending.size() > 0) {
+ getLog().warn("Send failed, " + fPending.size() + " message to send.");
pubResponse.setPendingMsgs(fPending.size());
}
if (os != null) {
try {
- os.close();
+ os.close();
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage("Error in closing Output Stream");
}
- }
+ }
}
-
+
return pubResponse;
}
-
-private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
-
- if (reply.isEmpty())
- {
-
- mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- mrPubResponse.setResponseMessage("Please verify the Producer properties");
- }
- else if(reply.startsWith("{"))
- {
+
+ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+
+ if (reply.isEmpty()) {
+
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ mrPubResponse.setResponseMessage("Please verify the Producer properties");
+ } else if (reply.startsWith("{")) {
JSONObject jObject = new JSONObject(reply);
- if(jObject.has("message") && jObject.has("status"))
- {
+ if (jObject.has("message") && jObject.has("status")) {
String message = jObject.getString("message");
- if(null != message)
- {
- mrPubResponse.setResponseMessage(message);
+ if (null != message) {
+ mrPubResponse.setResponseMessage(message);
}
mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+ } else {
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+ mrPubResponse.setResponseMessage(reply);
}
- else
- {
- mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
- mrPubResponse.setResponseMessage(reply);
- }
- }
- else if (reply.startsWith("<"))
- {
- String responseCode = getHTTPErrorResponseCode(reply);
- if( responseCode.contains("403"))
- {
- responseCode = "403";
- }
- mrPubResponse.setResponseCode(responseCode);
- mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
- }
-
+ } else if (reply.startsWith("<")) {
+ String responseCode = getHTTPErrorResponseCode(reply);
+ if (responseCode.contains("403")) {
+ responseCode = "403";
+ }
+ mrPubResponse.setResponseCode(responseCode);
+ mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ }
+
return mrPubResponse;
}
@@ -658,10 +611,10 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
private String username;
private String password;
private String host;
-
- //host selector
+
+ // host selector
private HostSelector fHostSelector = null;
-
+
private final LinkedBlockingQueue<TimestampedMessage> fPending;
private long fDontSendUntilMs;
private final ScheduledThreadPoolExecutor fExec;
@@ -684,25 +637,24 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
private HashMap<String, String> DMETimeOuts;
private DME2Client sender;
public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
- public String producerFilePath;
private String authKey;
private String authDate;
private String handlers;
private Properties props;
public static String routerFilePath;
- protected static final Map<String, String> headers=new HashMap<String, String>();
+ protected static final Map<String, String> headers = new HashMap<String, String>();
public static MultivaluedMap<String, Object> headersMap;
-
-
+
private MRPublisherResponse pubResponse;
-
+
public MRPublisherResponse getPubResponse() {
return pubResponse;
}
+
public void setPubResponse(MRPublisherResponse pubResponse) {
this.pubResponse = pubResponse;
}
-
+
public static String getRouterFilePath() {
return routerFilePath;
}
@@ -719,14 +671,6 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
this.props = props;
}
- public String getProducerFilePath() {
- return producerFilePath;
- }
-
- public void setProducerFilePath(String producerFilePath) {
- this.producerFilePath = producerFilePath;
- }
-
public String getProtocolFlag() {
return protocolFlag;
}
@@ -734,14 +678,14 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
public void setProtocolFlag(String protocolFlag) {
this.protocolFlag = protocolFlag;
}
-
-
+
private void DME2Configue() throws Exception {
try {
-
- /* FileReader reader = new FileReader(new File (producerFilePath));
- Properties props = new Properties();
- props.load(reader);*/
+
+ /*
+ * FileReader reader = new FileReader(new File (producerFilePath));
+ * Properties props = new Properties(); props.load(reader);
+ */
latitude = props.getProperty("Latitude");
longitude = props.getProperty("Longitude");
version = props.getProperty("Version");
@@ -749,41 +693,43 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
env = props.getProperty("Environment");
partner = props.getProperty("Partner");
routeOffer = props.getProperty("routeOffer");
- subContextPath = props.getProperty("SubContextPath")+fTopic;
- /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
- subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
- }*/
+ subContextPath = props.getProperty("SubContextPath") + fTopic;
+ /*
+ * if(props.getProperty("partition")!=null &&
+ * !props.getProperty("partition").equalsIgnoreCase("")){
+ * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
+ * "partition"); }
+ */
protocol = props.getProperty("Protocol");
methodType = props.getProperty("MethodType");
dmeuser = props.getProperty("username");
dmepassword = props.getProperty("password");
contentType = props.getProperty("contenttype");
handlers = props.getProperty("sessionstickinessrequired");
- routerFilePath= props.getProperty("DME2preferredRouterFilePath");
-
+ routerFilePath = props.getProperty("DME2preferredRouterFilePath");
+
/**
- * Changes to DME2Client url to use Partner for auto failover between data centers
- * When Partner value is not provided use the routeOffer value for auto failover within a cluster
+ * Changes to DME2Client url to use Partner for auto failover
+ * between data centers When Partner value is not provided use the
+ * routeOffer value for auto failover within a cluster
*/
-
String partitionKey = props.getProperty("partition");
-
- if (partner != null && !partner.isEmpty() )
- {
- url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner;
- if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
- url = url + "&partitionKey=" + partitionKey;
- }
- }
- else if (routeOffer!=null && !routeOffer.isEmpty())
- {
- url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
- if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
- url = url + "&partitionKey=" + partitionKey;
- }
+
+ if (partner != null && !partner.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
+ + partner;
+ if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ } else if (routeOffer != null && !routeOffer.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+ + routeOffer;
+ if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+ url = url + "&partitionKey=" + partitionKey;
+ }
}
-
+
DMETimeOuts = new HashMap<String, String>();
DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
@@ -791,56 +737,56 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
DMETimeOuts.put("Content-Type", contentType);
System.setProperty("AFT_LATITUDE", latitude);
System.setProperty("AFT_LONGITUDE", longitude);
- System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
- //System.setProperty("DME2.DEBUG", "true");
- // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
- //System.out.println("XXXXXX"+url);
-
- //SSL changes
- System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
- "SSLv3,TLSv1,TLSv1.1");
+ System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+ // System.setProperty("DME2.DEBUG", "true");
+
+ // SSL changes
+ // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+ // "SSLv3,TLSv1,TLSv1.1");
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-
- //SSL changes
-
+
+ // SSL changes
+
sender = new DME2Client(new URI(url), 5000L);
-
+
sender.setAllowAllHttpReturnCodes(true);
sender.setMethod(methodType);
- sender.setSubContext(subContextPath);
+ sender.setSubContext(subContextPath);
sender.setCredentials(dmeuser, dmepassword);
sender.setHeaders(DMETimeOuts);
- if(handlers.equalsIgnoreCase("yes")){
- sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+ if (handlers.equalsIgnoreCase("yes")) {
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+ props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
+ props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
- }else{
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
- }
+ } else {
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+ }
} catch (DME2Exception x) {
getLog().warn(x.getMessage(), x);
- throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
+ throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
} catch (URISyntaxException x) {
-
+
getLog().warn(x.getMessage(), x);
- throw new URISyntaxException(url,x.getMessage());
+ throw new URISyntaxException(url, x.getMessage());
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
throw new Exception(x.getMessage());
}
}
-
- private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
- {
- super ( hosts );
- if ( topic == null || topic.length() < 1 )
- {
- throw new IllegalArgumentException ( "A topic must be provided." );
+ private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+ boolean compress) throws MalformedURLException {
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
}
-
+
fHostSelector = new HostSelector(hosts, null);
fClosed = false;
fTopic = topic;
@@ -848,49 +794,45 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
fMaxBatchAgeMs = maxBatchAgeMs;
fCompress = compress;
- fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+ fPending = new LinkedBlockingQueue<TimestampedMessage>();
fDontSendUntilMs = 0;
- fExec = new ScheduledThreadPoolExecutor ( 1 );
+ fExec = new ScheduledThreadPoolExecutor(1);
pubResponse = new MRPublisherResponse();
-
+
}
-
- private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
- {
- super ( hosts );
- if ( topic == null || topic.length() < 1 )
- {
- throw new IllegalArgumentException ( "A topic must be provided." );
+ private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+ boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
}
-
+
fHostSelector = new HostSelector(hosts, null);
fClosed = false;
fTopic = topic;
fMaxBatchSize = maxBatchSize;
fMaxBatchAgeMs = maxBatchAgeMs;
fCompress = compress;
- threadOccuranceTime=httpThreadOccurnace;
- fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+ threadOccuranceTime = httpThreadOccurnace;
+ fPending = new LinkedBlockingQueue<TimestampedMessage>();
fDontSendUntilMs = 0;
- fExec = new ScheduledThreadPoolExecutor ( 1 );
- fExec.scheduleAtFixedRate ( new Runnable()
- {
+ fExec = new ScheduledThreadPoolExecutor(1);
+ fExec.scheduleAtFixedRate(new Runnable() {
@Override
- public void run ()
- {
- send ( false );
+ public void run() {
+ send(false);
}
- }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
+ }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
}
- private static class TimestampedMessage extends message
- {
- public TimestampedMessage ( message m )
- {
- super ( m );
+ private static class TimestampedMessage extends message {
+ public TimestampedMessage(message m) {
+ super(m);
timestamp = Clock.now();
}
+
public final long timestamp;
}
@@ -941,5 +883,5 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
public void setAuthDate(String authDate) {
this.authDate = authDate;
}
-
+
}
diff --git a/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java
index bdd15d4..2886db5 100644
--- a/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java
+++ b/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java
@@ -38,14 +38,14 @@ public class SimpleExampleConsumer {
private static final Logger logger = LoggerFactory.getLogger(SimpleExampleConsumer.class);
- private SimpleExampleConsumer() {
- }
+ private SimpleExampleConsumer() {
+ }
public static void main(String[] args) {
long count = 0;
long nextReport = 5000;
- String key;
+ String key;
final long startMs = System.currentTimeMillis();
@@ -54,24 +54,24 @@ public class SimpleExampleConsumer {
final MRConsumer cc = MRClientFactory.createConsumer("D:\\SG\\consumer.properties");
while (true) {
for (String msg : cc.fetch()) {
- logger.debug("Message Received: " + msg);
+ logger.debug("Message Received: " + msg);
}
// Header for DME2 Call.
MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap;
- for (MultivaluedMap.Entry<String,List<Object>> entry: headersMap.entrySet()) {
- key = entry.getKey();
- logger.debug("Header Key " + key);
- logger.debug("Header Value " + headersMap.get(key));
+ for (MultivaluedMap.Entry<String, List<Object>> entry : headersMap.entrySet()) {
+ key = entry.getKey();
+ logger.debug("Header Key " + key);
+ logger.debug("Header Value " + headersMap.get(key));
}
// Header for HTTP Call.
-
- Map<String, String> dme2headersMap=MRClientFactory.DME2HeadersMap;
- for(Map.Entry<String,String> entry: dme2headersMap.entrySet()) {
- key = entry.getKey();
- logger.debug("Header Key " + key);
- logger.debug("Header Value " + dme2headersMap.get(key));
- }
-
+
+ Map<String, String> dme2headersMap = MRClientFactory.DME2HeadersMap;
+ for (Map.Entry<String, String> entry : dme2headersMap.entrySet()) {
+ key = entry.getKey();
+ logger.debug("Header Key " + key);
+ logger.debug("Header Value " + dme2headersMap.get(key));
+ }
+
if (count > nextReport) {
nextReport += 5000;
@@ -79,11 +79,10 @@ public class SimpleExampleConsumer {
final long elapsedMs = endMs - startMs;
final double elapsedSec = elapsedMs / 1000.0;
final double eps = count / elapsedSec;
- logger.error("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
}
}
} catch (Exception x) {
- logger.error(x.getClass().getName() + ": " + x.getMessage());
+ logger.error(x.getClass().getName() + ": " + x.getMessage());
}
}
}
diff --git a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java
index 6e86d47..a4a176e 100644
--- a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java
+++ b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java
@@ -29,11 +29,9 @@ package com.att.nsa.mr.test.clients;
*
*/
public enum ProtocolTypeConstants {
-
- DME2("DME2"),
- AAF_AUTH("HTTPAAF"),
- AUTH_KEY("HTTPAUTH");
-
+
+ DME2("DME2"), AAF_AUTH("HTTPAAF"), AUTH_KEY("HTTPAUTH"), HTTPNOAUTH("HTTPNOAUTH");
+
private String value;
private ProtocolTypeConstants(String value) {
diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java
index 5ae36d2..0e3ee5a 100644
--- a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java
+++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java
@@ -33,57 +33,52 @@ import org.slf4j.LoggerFactory;
import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.MRConsumer;
-public class SimpleExampleConsumer
-{
+public class SimpleExampleConsumer {
- static FileWriter routeWriter= null;
- static Properties props=null;
- static FileReader routeReader=null;
- public static void main ( String[] args )
- {
+ static FileWriter routeWriter = null;
+ static Properties props = null;
+ static FileReader routeReader = null;
+
+ public static void main(String[] args) {
final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumer.class);
-
+
long count = 0;
long nextReport = 5000;
- final long startMs = System.currentTimeMillis ();
-
- try
- {
- String routeFilePath="/src/main/resources/dme2/preferredRoute.txt";
-
-
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File (routeFilePath));
- }
- routeReader= new FileReader(new File (routeFilePath));
- props= new Properties();
- final MRConsumer cc = MRClientFactory.createConsumer ( "/src/main/resources/dme2/consumer.properties" );
- while ( true )
- {
- for ( String msg : cc.fetch () )
- {
- //System.out.println ( "" + (++count) + ": " + msg );
+ final long startMs = System.currentTimeMillis();
+
+ try {
+ String routeFilePath = "/src/main/resources/dme2/preferredRoute.txt";
+
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
+ }
+ routeReader = new FileReader(new File(routeFilePath));
+ props = new Properties();
+ final MRConsumer cc = MRClientFactory.createConsumer("/src/main/resources/dme2/consumer.properties");
+ int i = 0;
+ while (i < 10) {
+ Thread.sleep(2);
+ i++;
+ for (String msg : cc.fetch()) {
+ // System.out.println ( "" + (++count) + ": " + msg );
System.out.println(msg);
}
-
- if ( count > nextReport )
- {
+
+ if (count > nextReport) {
nextReport += 5000;
-
- final long endMs = System.currentTimeMillis ();
+
+ final long endMs = System.currentTimeMillis();
final long elapsedMs = endMs - startMs;
final double elapsedSec = elapsedMs / 1000.0;
final double eps = count / elapsedSec;
- System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
+ System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
}
}
- }
- catch ( Exception x )
- {
- System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
- LOG.error("exception: ", x);
+ } catch (Exception x) {
+ System.err.println(x.getClass().getName() + ": " + x.getMessage());
+ LOG.error("exception: ", x);
}
}
}