aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java')
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java499
1 files changed, 230 insertions, 269 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java
index 1780703..88d3dab 100644
--- a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java
+++ b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java
@@ -5,12 +5,13 @@
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Modifications Copyright © 2018 IBM.
- * ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,22 +20,30 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client;
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.MalformedURLException;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.UUID;
import javax.ws.rs.core.MultivaluedMap;
+
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.impl.MRMetaClient;
import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
import org.onap.dmaap.mr.tools.ValidatorUtil;
/**
@@ -50,10 +59,12 @@ import org.onap.dmaap.mr.tools.ValidatorUtil;
* instance.)<br/>
* <br/>
* Publishers
- *
+ *
* @author author
*/
public class MRClientFactory {
+
+ private static final String ID = "id";
private static final String AUTH_KEY = "authKey";
private static final String AUTH_DATE = "authDate";
private static final String PASSWORD = "password";
@@ -63,6 +74,36 @@ public class MRClientFactory {
private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
private static final String TOPIC = "topic";
private static final String TRANSPORT_TYPE = "TransportType";
+ private static final String MAX_BATCH_SIZE = "maxBatchSize";
+ private static final String MAX_AGE_MS = "maxAgeMs";
+ private static final String MESSAGE_SENT_THREAD_OCCURRENCE_OLD = "MessageSentThreadOccurance";
+ private static final String MESSAGE_SENT_THREAD_OCCURRENCE = "MessageSentThreadOccurrence";
+ private static final String GROUP = "group";
+ private static final String SERVICE_NAME = "ServiceName";
+ private static final String PARTNER = "Partner";
+ private static final String ROUTE_OFFER = "routeOffer";
+ private static final String PROTOCOL = "Protocol";
+ private static final String METHOD_TYPE = "MethodType";
+ private static final String CONTENT_TYPE = "contenttype";
+ private static final String LATITUDE = "Latitude";
+ private static final String LONGITUDE = "Longitude";
+ private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
+ private static final String VERSION = "Version";
+ private static final String ENVIRONMENT = "Environment";
+ private static final String SUB_CONTEXT_PATH = "SubContextPath";
+ private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
+ private static final String PARTITION = "partition";
+ private static final String COMPRESS = "compress";
+ private static final String TIMEOUT = "timeout";
+ private static final String LIMIT = "limit";
+ private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
+ private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
+ private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
+ private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
+ private static final String DME2_PER_HANDLER_TIMEOUT_MS = "DME2_PER_HANDLER_TIMEOUT_MS";
+ private static final String DME2_REPLY_HANDLER_TIMEOUT_MS = "DME2_REPLY_HANDLER_TIMEOUT_MS";
private static MultivaluedMap<String, Object> httpHeadersMap;
public static Map<String, String> DME2HeadersMap;
@@ -82,6 +123,7 @@ public class MRClientFactory {
/**
* Add getter to avoid direct access to static header map.
+ *
* @return
*/
public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
@@ -90,6 +132,7 @@ public class MRClientFactory {
/**
* Add setter to avoid direct access to static header map.
+ *
* @param headers
*/
public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
@@ -100,15 +143,11 @@ public class MRClientFactory {
* Create a consumer instance with the default timeout and no limit on
* messages returned. This consumer operates as an independent consumer
* (i.e., not in a group) and is NOT re-startable across sessions.
- *
- * @param hostList
- * A comma separated list of hosts to use to connect to MR. You
- * can include port numbers (3904 is the default). For example,
- * "hostname:8080,"
- *
- * @param topic
- * The topic to consume
- *
+ *
+ * @param hostList A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default). For example,
+ * "hostname:8080,"
+ * @param topic The topic to consume
* @return a consumer
*/
public static MRConsumer createConsumer(String hostList, String topic) {
@@ -119,12 +158,9 @@ public class MRClientFactory {
* Create a consumer instance with the default timeout and no limit on
* messages returned. This consumer operates as an independent consumer
* (i.e., not in a group) and is NOT re-startable across sessions.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- *
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
* @return a consumer
*/
public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
@@ -136,14 +172,10 @@ public class MRClientFactory {
* timeout, and no limit on messages returned. This consumer operates as an
* independent consumer (i.e., not in a group) and is NOT re-startable
* across sessions.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param filter
- * a filter to use on the server side
- *
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
+ * @param filter a filter to use on the server side
* @return a consumer
*/
public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
@@ -155,20 +187,15 @@ public class MRClientFactory {
* messages returned. This consumer can operate in a logical group and is
* re-startable across sessions when you use the same group and ID on
* restart.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- *
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consume in its group
* @return a consumer
*/
public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId) {
+ final String consumerId) {
return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
}
@@ -177,27 +204,20 @@ public class MRClientFactory {
* messages returned. This consumer can operate in a logical group and is
* re-startable across sessions when you use the same group and ID on
* restart.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- *
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consume in its group
+ * @param timeoutMs The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
* @return a consumer
*/
public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit) {
+ final String consumerId, int timeoutMs, int limit) {
return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
}
@@ -206,31 +226,23 @@ public class MRClientFactory {
* messages returned. This consumer can operate in a logical group and is
* re-startable across sessions when you use the same group and ID on
* restart. This consumer also uses server-side filtering.
- *
- * @param hostList
- * A comma separated list of hosts to use to connect to MR. You
- * can include port numbers (3904 is the default)"
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- * @param filter
- * A Highland Park filter expression using only built-in filter
- * components. Use null for "no filter".
- *
+ *
+ * @param hostList A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default)"
+ * @param topic The topic to consume
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consume in its group
+ * @param timeoutMs The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
* @return a consumer
*/
public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
filter, apiKey, apiSecret);
}
@@ -240,32 +252,25 @@ public class MRClientFactory {
* messages returned. This consumer can operate in a logical group and is
* re-startable across sessions when you use the same group and ID on
* restart. This consumer also uses server-side filtering.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- * @param filter
- * A Highland Park filter expression using only built-in filter
- * components. Use null for "no filter".
- *
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consume in its group
+ * @param timeoutMs The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
* @return a consumer
*/
public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
- if (MRClientBuilders.sfConsumerMock != null)
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ if (MRClientBuilders.sfConsumerMock != null) {
return MRClientBuilders.sfConsumerMock;
+ }
try {
return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
.setConsumerGroup(consumerGroup).setConsumerId(consumerId)
@@ -277,21 +282,19 @@ public class MRClientFactory {
}
}
- /*************************************************************************/
- /*************************************************************************/
- /*************************************************************************/
+ //*************************************************************************
+ //*************************************************************************
+ //*************************************************************************
/**
* Create a publisher that sends each message (or group of messages)
* immediately. Most applications should favor higher latency for much
* higher message throughput and the "simple publisher" is not a good
* choice.
- *
- * @param hostlist
- * The host used in the URL to MR. Can be "host:port", can be
- * multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
+ *
+ * @param hostlist The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic The topic on which to publish messages.
* @return a publisher
*/
public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
@@ -302,94 +305,69 @@ public class MRClientFactory {
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown. Message payloads are
* not compressed.
- *
- * @param hostlist
- * The host used in the URL to MR. Can be "host:port", can be
- * multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- *
+ *
+ * @param hostlist The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic The topic on which to publish messages.
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
* @return a publisher
*/
public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
- long maxAgeMs) {
+ long maxAgeMs) {
return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
}
/**
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown.
- *
- * @param hostlist
- * The host used in the URL to MR. Can be "host:port", can be
- * multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
+ *
+ * @param hostlist The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic The topic on which to publish messages.
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param compress use gzip compression
* @return a publisher
*/
public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
- long maxAgeMs, boolean compress) {
+ long maxAgeMs, boolean compress) {
return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
}
/**
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
+ *
+ * @param hostSet A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic The topic on which to publish messages.
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param compress use gzip compression
* @return a publisher
*/
public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
- long maxAgeMs, boolean compress) {
+ long maxAgeMs, boolean compress) {
final TreeSet<String> hosts = new TreeSet<>();
- for (String hp : hostSet) {
- hosts.add(hp);
- }
+ Collections.addAll(hosts, hostSet);
return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
}
/**
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
+ *
+ * @param hostSet A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic The topic on which to publish messages.
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param compress use gzip compression
* @return a publisher
*/
public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
- int maxBatchSize, long maxAgeMs, boolean compress) {
+ int maxBatchSize, long maxAgeMs, boolean compress) {
return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
.compress(compress).build();
}
@@ -397,28 +375,20 @@ public class MRClientFactory {
/**
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown.
- *
- * @param host
- * A host to be used in the URL to MR. Can be "host:port". Use
- * multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param username
- * username
- * @param password
- * password
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- * @param protocolFlag
- * http auth or ueb auth or dme2 method
+ *
+ * @param host A host to be used in the URL to MR. Can be "host:port". Use
+ * multiple entries to enable failover.
+ * @param topic The topic on which to publish messages.
+ * @param username username
+ * @param password password
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param compress use gzip compression
+ * @param protocolFlag http auth or ueb auth or dme2 method
* @return MRBatchingPublisher obj
*/
public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
- final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
+ final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
.againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
.compress(compress).build();
@@ -433,14 +403,11 @@ public class MRClientFactory {
/**
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown
- *
- * @param props
- * props set all properties for publishing message
+ *
+ * @param props props set all properties for publishing message
* @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
+ * @throws FileNotFoundException exc
+ * @throws IOException ioex
*/
public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
throws FileNotFoundException, IOException {
@@ -450,14 +417,11 @@ public class MRClientFactory {
/**
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown
- *
- * @param props
- * props set all properties for publishing message
+ *
+ * @param props props set all properties for publishing message
* @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
+ * @throws FileNotFoundException exc
+ * @throws IOException ioex
*/
public static MRBatchingPublisher createBatchingPublisher(Properties props)
throws FileNotFoundException, IOException {
@@ -467,19 +431,16 @@ public class MRClientFactory {
/**
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown
- *
- * @param producerFilePath
- * set all properties for publishing message
+ *
+ * @param producerFilePath set all properties for publishing message
* @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
+ * @throws FileNotFoundException exc
+ * @throws IOException ioex
*/
public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
throws FileNotFoundException, IOException {
Properties props = new Properties();
- try(InputStream input = new FileInputStream(producerFilePath)) {
+ try (InputStream input = new FileInputStream(producerFilePath)) {
props.load(input);
}
return createBatchingPublisher(props);
@@ -488,19 +449,16 @@ public class MRClientFactory {
/**
* Create a publisher that will contain send methods that return response
* object to user.
- *
- * @param producerFilePath
- * set all properties for publishing message
+ *
+ * @param producerFilePath set all properties for publishing message
* @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
+ * @throws FileNotFoundException exc
+ * @throws IOException ioex
*/
public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
throws FileNotFoundException, IOException {
Properties props = new Properties();
- try(InputStream input = new FileInputStream(producerFilePath)) {
+ try (InputStream input = new FileInputStream(producerFilePath)) {
props.load(input);
}
return createBatchingPublisher(props, withResponse);
@@ -510,26 +468,32 @@ public class MRClientFactory {
throws FileNotFoundException, IOException {
assert props != null;
MRSimplerBatchPublisher pub;
+
+ String messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE);
+ if (messageSentThreadOccurrence == null || messageSentThreadOccurrence.isEmpty()) {
+ messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE_OLD);
+ }
+
if (withResponse) {
pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE))
.onTopic(props.getProperty(TOPIC))
- .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
- Integer.parseInt(props.getProperty("maxAgeMs").toString()))
- .compress(Boolean.parseBoolean(props.getProperty("compress")))
- .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
+ .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)),
+ Integer.parseInt(props.getProperty(MAX_AGE_MS).toString()))
+ .compress(Boolean.parseBoolean(props.getProperty(COMPRESS)))
+ .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence))
.withResponse(withResponse).build();
} else {
pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE))
.onTopic(props.getProperty(TOPIC))
- .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
- Integer.parseInt(props.getProperty("maxAgeMs").toString()))
- .compress(Boolean.parseBoolean(props.getProperty("compress")))
- .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
+ .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)),
+ Integer.parseInt(props.getProperty(MAX_AGE_MS).toString()))
+ .compress(Boolean.parseBoolean(props.getProperty(COMPRESS)))
+ .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence)).build();
}
pub.setHost(props.getProperty(HOST));
- if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
pub.setAuthKey(props.getProperty(AUTH_KEY));
pub.setAuthDate(props.getProperty(AUTH_DATE));
@@ -542,7 +506,7 @@ public class MRClientFactory {
pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
pub.setProps(props);
prop = new Properties();
- if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
routeReader = new FileReader(new File(routeFilePath));
File fo = new File(routeFilePath);
@@ -555,14 +519,11 @@ public class MRClientFactory {
/**
* Create an identity manager client to work with API keys.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param apiKey
- * Your API key
- * @param apiSecret
- * Your API secret
+ *
+ * @param hostSet A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey Your API key
+ * @param apiSecret Your API secret
* @return an identity manager
*/
public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
@@ -578,14 +539,11 @@ public class MRClientFactory {
/**
* Create a topic manager for working with topics.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param apiKey
- * Your API key
- * @param apiSecret
- * Your API secret
+ *
+ * @param hostSet A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey Your API key
+ * @param apiSecret Your API secret
* @return a topic manager
*/
public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
@@ -601,7 +559,7 @@ public class MRClientFactory {
/**
* Inject a consumer. Used to support unit tests.
- *
+ *
* @param cc
*/
public static void $testInject(MRConsumer cc) {
@@ -609,13 +567,13 @@ public class MRClientFactory {
}
public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
- String id, int i, int j, String protocalFlag, String consumerFilePath) {
+ String id, int timeout, int limit, String protocalFlag, String consumerFilePath) {
MRConsumerImpl sub;
try {
sub = new MRConsumerImpl.MRConsumerImplBuilder()
.setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
- .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
+ .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
.setFilter(null).setApiKey_username(null).setApiSecret_password(null)
.createMRConsumerImpl();
} catch (MalformedURLException e) {
@@ -631,13 +589,13 @@ public class MRClientFactory {
}
public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
- String id, String protocalFlag, String consumerFilePath, int i, int j) {
+ String id, String protocalFlag, String consumerFilePath, int timeout, int limit) {
MRConsumerImpl sub;
try {
sub = new MRConsumerImpl.MRConsumerImplBuilder()
.setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
- .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
+ .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
.setFilter(null).setApiKey_username(null).setApiSecret_password(null)
.createMRConsumerImpl();
} catch (MalformedURLException e) {
@@ -654,7 +612,7 @@ public class MRClientFactory {
public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
Properties props = new Properties();
- try(InputStream input = new FileInputStream(consumerFilePath)) {
+ try (InputStream input = new FileInputStream(consumerFilePath)) {
props.load(input);
}
return createConsumer(props);
@@ -663,26 +621,29 @@ public class MRClientFactory {
public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
int timeout;
ValidatorUtil.validateSubscriber(props);
- if (props.getProperty("timeout") != null)
- timeout = Integer.parseInt(props.getProperty("timeout"));
- else
+ if (props.getProperty(TIMEOUT) != null) {
+ timeout = Integer.parseInt(props.getProperty(TIMEOUT));
+ } else {
timeout = -1;
+ }
int limit;
- if (props.getProperty("limit") != null)
- limit = Integer.parseInt(props.getProperty("limit"));
- else
+ if (props.getProperty(LIMIT) != null) {
+ limit = Integer.parseInt(props.getProperty(LIMIT));
+ } else {
limit = -1;
+ }
String group;
- if (props.getProperty("group") == null)
+ if (props.getProperty(GROUP) == null) {
group = UUID.randomUUID().toString();
- else
- group = props.getProperty("group");
+ } else {
+ group = props.getProperty(GROUP);
+ }
MRConsumerImpl sub = null;
- if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
sub = new MRConsumerImpl.MRConsumerImplBuilder()
.setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
.setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
- .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
+ .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit)
.setFilter(props.getProperty(FILTER))
.setApiKey_username(props.getProperty(AUTH_KEY))
.setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
@@ -694,29 +655,29 @@ public class MRClientFactory {
sub = new MRConsumerImpl.MRConsumerImplBuilder()
.setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
.setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
- .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
+ .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit)
.setFilter(props.getProperty(FILTER))
.setApiKey_username(props.getProperty(USERNAME))
.setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
sub.setUsername(props.getProperty(USERNAME));
sub.setPassword(props.getProperty(PASSWORD));
}
-
+
sub.setProps(props);
sub.setHost(props.getProperty(HOST));
sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
sub.setfFilter(props.getProperty(FILTER));
- if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
routeReader = new FileReader(new File(routeFilePath));
prop = new Properties();
File fo = new File(routeFilePath);
- if (!fo.exists()) {
- routeWriter = new FileWriter(new File(routeFilePath));
- }
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
+ }
}
-
+
return sub;
}
}