diff options
author | sliard <samuel.liard@gmail.com> | 2021-04-12 15:58:22 +0200 |
---|---|---|
committer | Fiachra Corcoran <fiachra.corcoran@est.tech> | 2021-04-15 16:48:33 +0000 |
commit | 72a9ab9e886cdeabc4b43418a7054a5796a0ff55 (patch) | |
tree | 8617e503cce134f06c10507cf99d43cd4e1eedcc /src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java | |
parent | 78ebc9a64fac6231e3e594200b9335a4c6372ed1 (diff) |
[DMAAP-CLIENT] First sonar issues review part2
update Copyright informations
fix checkstyle warning and fix code review comments
remove Prop
Issue-ID: DMAAP-1585
Change-Id: I445ca5d0888a555acbac70af7ed571be26d74f79
Signed-off-by: sliard <samuel.liard@gmail.com>
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java | 499 |
1 files changed, 230 insertions, 269 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java index 1780703..88d3dab 100644 --- a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java +++ b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java @@ -5,12 +5,13 @@ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Modifications Copyright © 2018 IBM. - * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,22 +20,30 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client; -import java.io.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.TreeSet; import java.util.UUID; import javax.ws.rs.core.MultivaluedMap; + import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRMetaClient; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.onap.dmaap.mr.tools.ValidatorUtil; /** @@ -50,10 +59,12 @@ import org.onap.dmaap.mr.tools.ValidatorUtil; * instance.)<br/> * <br/> * Publishers - * + * * @author author */ public class MRClientFactory { + + private static final String ID = "id"; private static final String AUTH_KEY = "authKey"; private static final String AUTH_DATE = "authDate"; private static final String PASSWORD = "password"; @@ -63,6 +74,36 @@ public class MRClientFactory { private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath"; private static final String TOPIC = "topic"; private static final String TRANSPORT_TYPE = "TransportType"; + private static final String MAX_BATCH_SIZE = "maxBatchSize"; + private static final String MAX_AGE_MS = "maxAgeMs"; + private static final String MESSAGE_SENT_THREAD_OCCURRENCE_OLD = "MessageSentThreadOccurance"; + private static final String MESSAGE_SENT_THREAD_OCCURRENCE = "MessageSentThreadOccurrence"; + private static final String GROUP = "group"; + private static final String SERVICE_NAME = "ServiceName"; + private static final String PARTNER = "Partner"; + private static final String ROUTE_OFFER = "routeOffer"; + private static final String PROTOCOL = "Protocol"; + private static final String METHOD_TYPE = "MethodType"; + private static final String CONTENT_TYPE = "contenttype"; + private static final String LATITUDE = "Latitude"; + private static final String LONGITUDE = "Longitude"; + private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT"; + private static final String VERSION = "Version"; + private static final String ENVIRONMENT = "Environment"; + private static final String SUB_CONTEXT_PATH = "SubContextPath"; + private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired"; + private static final String PARTITION = "partition"; + private static final String COMPRESS = "compress"; + private static final String TIMEOUT = "timeout"; + private static final String LIMIT = "limit"; + private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS"; + private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT"; + private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS"; + private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS"; + private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON"; + private static final String DME2_PER_HANDLER_TIMEOUT_MS = "DME2_PER_HANDLER_TIMEOUT_MS"; + private static final String DME2_REPLY_HANDLER_TIMEOUT_MS = "DME2_REPLY_HANDLER_TIMEOUT_MS"; private static MultivaluedMap<String, Object> httpHeadersMap; public static Map<String, String> DME2HeadersMap; @@ -82,6 +123,7 @@ public class MRClientFactory { /** * Add getter to avoid direct access to static header map. + * * @return */ public static MultivaluedMap<String, Object> getHTTPHeadersMap() { @@ -90,6 +132,7 @@ public class MRClientFactory { /** * Add setter to avoid direct access to static header map. + * * @param headers */ public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) { @@ -100,15 +143,11 @@ public class MRClientFactory { * Create a consumer instance with the default timeout and no limit on * messages returned. This consumer operates as an independent consumer * (i.e., not in a group) and is NOT re-startable across sessions. - * - * @param hostList - * A comma separated list of hosts to use to connect to MR. You - * can include port numbers (3904 is the default). For example, - * "hostname:8080," - * - * @param topic - * The topic to consume - * + * + * @param hostList A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default). For example, + * "hostname:8080," + * @param topic The topic to consume * @return a consumer */ public static MRConsumer createConsumer(String hostList, String topic) { @@ -119,12 +158,9 @@ public class MRClientFactory { * Create a consumer instance with the default timeout and no limit on * messages returned. This consumer operates as an independent consumer * (i.e., not in a group) and is NOT re-startable across sessions. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume * @return a consumer */ public static MRConsumer createConsumer(Collection<String> hostSet, String topic) { @@ -136,14 +172,10 @@ public class MRClientFactory { * timeout, and no limit on messages returned. This consumer operates as an * independent consumer (i.e., not in a group) and is NOT re-startable * across sessions. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param filter - * a filter to use on the server side - * + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param filter a filter to use on the server side * @return a consumer */ public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) { @@ -155,20 +187,15 @@ public class MRClientFactory { * messages returned. This consumer can operate in a logical group and is * re-startable across sessions when you use the same group and ID on * restart. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group * @return a consumer */ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId) { + final String consumerId) { return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1); } @@ -177,27 +204,20 @@ public class MRClientFactory { * messages returned. This consumer can operate in a logical group and is * re-startable across sessions when you use the same group and ID on * restart. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * @param timeoutMs The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit A limit on the number of messages returned in a single call. + * Use -1 for no limit. * @return a consumer */ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit) { + final String consumerId, int timeoutMs, int limit) { return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null); } @@ -206,31 +226,23 @@ public class MRClientFactory { * messages returned. This consumer can operate in a logical group and is * re-startable across sessions when you use the same group and ID on * restart. This consumer also uses server-side filtering. - * - * @param hostList - * A comma separated list of hosts to use to connect to MR. You - * can include port numbers (3904 is the default)" - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * @param filter - * A Highland Park filter expression using only built-in filter - * components. Use null for "no filter". - * + * + * @param hostList A comma separated list of hosts to use to connect to MR. You + * can include port numbers (3904 is the default)" + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * @param timeoutMs The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". * @return a consumer */ public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret); } @@ -240,32 +252,25 @@ public class MRClientFactory { * messages returned. This consumer can operate in a logical group and is * re-startable across sessions when you use the same group and ID on * restart. This consumer also uses server-side filtering. - * - * @param hostSet - * The host used in the URL to MR. Entries can be "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * @param filter - * A Highland Park filter expression using only built-in filter - * components. Use null for "no filter". - * + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * @param timeoutMs The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". * @return a consumer */ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { - if (MRClientBuilders.sfConsumerMock != null) + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) { + if (MRClientBuilders.sfConsumerMock != null) { return MRClientBuilders.sfConsumerMock; + } try { return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic) .setConsumerGroup(consumerGroup).setConsumerId(consumerId) @@ -277,21 +282,19 @@ public class MRClientFactory { } } - /*************************************************************************/ - /*************************************************************************/ - /*************************************************************************/ + //************************************************************************* + //************************************************************************* + //************************************************************************* /** * Create a publisher that sends each message (or group of messages) * immediately. Most applications should favor higher latency for much * higher message throughput and the "simple publisher" is not a good * choice. - * - * @param hostlist - * The host used in the URL to MR. Can be "host:port", can be - * multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. + * + * @param hostlist The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic The topic on which to publish messages. * @return a publisher */ public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) { @@ -302,94 +305,69 @@ public class MRClientFactory { * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown. Message payloads are * not compressed. - * - * @param hostlist - * The host used in the URL to MR. Can be "host:port", can be - * multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * + * + * @param hostlist The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch * @return a publisher */ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, - long maxAgeMs) { + long maxAgeMs) { return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false); } /** * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown. - * - * @param hostlist - * The host used in the URL to MR. Can be "host:port", can be - * multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * + * + * @param hostlist The host used in the URL to MR. Can be "host:port", can be + * multiple comma-separated entries. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression * @return a publisher */ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize, - long maxAgeMs, boolean compress) { + long maxAgeMs, boolean compress) { return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress); } /** * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * + * + * @param hostSet A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression * @return a publisher */ public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize, - long maxAgeMs, boolean compress) { + long maxAgeMs, boolean compress) { final TreeSet<String> hosts = new TreeSet<>(); - for (String hp : hostSet) { - hosts.add(hp); - } + Collections.addAll(hosts, hostSet); return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress); } /** * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * + * + * @param hostSet A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression * @return a publisher */ public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic, - int maxBatchSize, long maxAgeMs, boolean compress) { + int maxBatchSize, long maxAgeMs, boolean compress) { return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) .compress(compress).build(); } @@ -397,28 +375,20 @@ public class MRClientFactory { /** * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown. - * - * @param host - * A host to be used in the URL to MR. Can be "host:port". Use - * multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param username - * username - * @param password - * password - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * @param protocolFlag - * http auth or ueb auth or dme2 method + * + * @param host A host to be used in the URL to MR. Can be "host:port". Use + * multiple entries to enable failover. + * @param topic The topic on which to publish messages. + * @param username username + * @param password password + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression + * @param protocolFlag http auth or ueb auth or dme2 method * @return MRBatchingPublisher obj */ public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username, - final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) { + final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) { MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder() .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs) .compress(compress).build(); @@ -433,14 +403,11 @@ public class MRClientFactory { /** * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown - * - * @param props - * props set all properties for publishing message + * + * @param props props set all properties for publishing message * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex + * @throws FileNotFoundException exc + * @throws IOException ioex */ public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse) throws FileNotFoundException, IOException { @@ -450,14 +417,11 @@ public class MRClientFactory { /** * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown - * - * @param props - * props set all properties for publishing message + * + * @param props props set all properties for publishing message * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex + * @throws FileNotFoundException exc + * @throws IOException ioex */ public static MRBatchingPublisher createBatchingPublisher(Properties props) throws FileNotFoundException, IOException { @@ -467,19 +431,16 @@ public class MRClientFactory { /** * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown - * - * @param producerFilePath - * set all properties for publishing message + * + * @param producerFilePath set all properties for publishing message * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex + * @throws FileNotFoundException exc + * @throws IOException ioex */ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath) throws FileNotFoundException, IOException { Properties props = new Properties(); - try(InputStream input = new FileInputStream(producerFilePath)) { + try (InputStream input = new FileInputStream(producerFilePath)) { props.load(input); } return createBatchingPublisher(props); @@ -488,19 +449,16 @@ public class MRClientFactory { /** * Create a publisher that will contain send methods that return response * object to user. - * - * @param producerFilePath - * set all properties for publishing message + * + * @param producerFilePath set all properties for publishing message * @return MRBatchingPublisher obj - * @throws FileNotFoundException - * exc - * @throws IOException - * ioex + * @throws FileNotFoundException exc + * @throws IOException ioex */ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse) throws FileNotFoundException, IOException { Properties props = new Properties(); - try(InputStream input = new FileInputStream(producerFilePath)) { + try (InputStream input = new FileInputStream(producerFilePath)) { props.load(input); } return createBatchingPublisher(props, withResponse); @@ -510,26 +468,32 @@ public class MRClientFactory { throws FileNotFoundException, IOException { assert props != null; MRSimplerBatchPublisher pub; + + String messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE); + if (messageSentThreadOccurrence == null || messageSentThreadOccurrence.isEmpty()) { + messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE_OLD); + } + if (withResponse) { pub = new MRSimplerBatchPublisher.Builder() - .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE)) .onTopic(props.getProperty(TOPIC)) - .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), - Integer.parseInt(props.getProperty("maxAgeMs").toString())) - .compress(Boolean.parseBoolean(props.getProperty("compress"))) - .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))) + .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)), + Integer.parseInt(props.getProperty(MAX_AGE_MS).toString())) + .compress(Boolean.parseBoolean(props.getProperty(COMPRESS))) + .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence)) .withResponse(withResponse).build(); } else { pub = new MRSimplerBatchPublisher.Builder() - .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE)) .onTopic(props.getProperty(TOPIC)) - .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), - Integer.parseInt(props.getProperty("maxAgeMs").toString())) - .compress(Boolean.parseBoolean(props.getProperty("compress"))) - .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build(); + .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)), + Integer.parseInt(props.getProperty(MAX_AGE_MS).toString())) + .compress(Boolean.parseBoolean(props.getProperty(COMPRESS))) + .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence)).build(); } pub.setHost(props.getProperty(HOST)); - if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) { pub.setAuthKey(props.getProperty(AUTH_KEY)); pub.setAuthDate(props.getProperty(AUTH_DATE)); @@ -542,7 +506,7 @@ public class MRClientFactory { pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE)); pub.setProps(props); prop = new Properties(); - if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) { routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH); routeReader = new FileReader(new File(routeFilePath)); File fo = new File(routeFilePath); @@ -555,14 +519,11 @@ public class MRClientFactory { /** * Create an identity manager client to work with API keys. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param apiKey - * Your API key - * @param apiSecret - * Your API secret + * + * @param hostSet A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey Your API key + * @param apiSecret Your API secret * @return an identity manager */ public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) { @@ -578,14 +539,11 @@ public class MRClientFactory { /** * Create a topic manager for working with topics. - * - * @param hostSet - * A set of hosts to be used in the URL to MR. Can be - * "host:port". Use multiple entries to enable failover. - * @param apiKey - * Your API key - * @param apiSecret - * Your API secret + * + * @param hostSet A set of hosts to be used in the URL to MR. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey Your API key + * @param apiSecret Your API secret * @return a topic manager */ public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) { @@ -601,7 +559,7 @@ public class MRClientFactory { /** * Inject a consumer. Used to support unit tests. - * + * * @param cc */ public static void $testInject(MRConsumer cc) { @@ -609,13 +567,13 @@ public class MRClientFactory { } public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, - String id, int i, int j, String protocalFlag, String consumerFilePath) { + String id, int timeout, int limit, String protocalFlag, String consumerFilePath) { MRConsumerImpl sub; try { sub = new MRConsumerImpl.MRConsumerImplBuilder() .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic) - .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j) + .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit) .setFilter(null).setApiKey_username(null).setApiSecret_password(null) .createMRConsumerImpl(); } catch (MalformedURLException e) { @@ -631,13 +589,13 @@ public class MRClientFactory { } public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, - String id, String protocalFlag, String consumerFilePath, int i, int j) { + String id, String protocalFlag, String consumerFilePath, int timeout, int limit) { MRConsumerImpl sub; try { sub = new MRConsumerImpl.MRConsumerImplBuilder() .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic) - .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j) + .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit) .setFilter(null).setApiKey_username(null).setApiSecret_password(null) .createMRConsumerImpl(); } catch (MalformedURLException e) { @@ -654,7 +612,7 @@ public class MRClientFactory { public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException { Properties props = new Properties(); - try(InputStream input = new FileInputStream(consumerFilePath)) { + try (InputStream input = new FileInputStream(consumerFilePath)) { props.load(input); } return createConsumer(props); @@ -663,26 +621,29 @@ public class MRClientFactory { public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException { int timeout; ValidatorUtil.validateSubscriber(props); - if (props.getProperty("timeout") != null) - timeout = Integer.parseInt(props.getProperty("timeout")); - else + if (props.getProperty(TIMEOUT) != null) { + timeout = Integer.parseInt(props.getProperty(TIMEOUT)); + } else { timeout = -1; + } int limit; - if (props.getProperty("limit") != null) - limit = Integer.parseInt(props.getProperty("limit")); - else + if (props.getProperty(LIMIT) != null) { + limit = Integer.parseInt(props.getProperty(LIMIT)); + } else { limit = -1; + } String group; - if (props.getProperty("group") == null) + if (props.getProperty(GROUP) == null) { group = UUID.randomUUID().toString(); - else - group = props.getProperty("group"); + } else { + group = props.getProperty(GROUP); + } MRConsumerImpl sub = null; - if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) { sub = new MRConsumerImpl.MRConsumerImplBuilder() .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST))) .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group) - .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit) + .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit) .setFilter(props.getProperty(FILTER)) .setApiKey_username(props.getProperty(AUTH_KEY)) .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl(); @@ -694,29 +655,29 @@ public class MRClientFactory { sub = new MRConsumerImpl.MRConsumerImplBuilder() .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST))) .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group) - .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit) + .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit) .setFilter(props.getProperty(FILTER)) .setApiKey_username(props.getProperty(USERNAME)) .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl(); sub.setUsername(props.getProperty(USERNAME)); sub.setPassword(props.getProperty(PASSWORD)); } - + sub.setProps(props); sub.setHost(props.getProperty(HOST)); sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE)); sub.setfFilter(props.getProperty(FILTER)); - if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { + if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) { MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH)); routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH); routeReader = new FileReader(new File(routeFilePath)); prop = new Properties(); File fo = new File(routeFilePath); - if (!fo.exists()) { - routeWriter = new FileWriter(new File(routeFilePath)); - } + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); + } } - + return sub; } } |