From 7d45c179879363222fcf49b30f75837f66d7f423 Mon Sep 17 00:00:00 2001 From: Varun Gudisena Date: Thu, 31 Aug 2017 10:44:28 -0500 Subject: Revert package name changes Reverted package name changes to avoid any potential issues. Renamed maven group id only. Issue-id: DMAAP-74 Change-Id: I36c2aef063050c265640b79e6dc0e8ab7add8d22 Signed-off-by: Varun Gudisena --- .../com/att/nsa/mr/client/MRClientFactory.java | 558 +++++++++++++++++++++ 1 file changed, 558 insertions(+) create mode 100644 src/main/java/com/att/nsa/mr/client/MRClientFactory.java (limited to 'src/main/java/com/att/nsa/mr/client/MRClientFactory.java') diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java new file mode 100644 index 0000000..59e472c --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java @@ -0,0 +1,558 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import java.util.TreeSet; +import java.util.UUID; + +import javax.ws.rs.core.MultivaluedMap; + +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.client.impl.MRMetaClient; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +/** + * A factory for MR clients.
+ *
+ * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates + * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive + * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's + * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across + * them. Be sure to use a different ID for each instance.)
+ *
+ * Publishers + * + * @author author + */ +public class MRClientFactory +{ + public static MultivaluedMap HTTPHeadersMap; + public static Map DME2HeadersMap; + public static String routeFilePath; + + public static FileReader routeReader; + + public static FileWriter routeWriter= null; + public static Properties prop=null; + //routeReader= new FileReader(new File (routeFilePath)); + //props= new Properties(); + /** + * Create a consumer instance with the default timeout and no limit + * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable + * across sessions. + * + * @param hostList A comma separated list of hosts to use to connect to MR. + * You can include port numbers (3904 is the default). For example, "hostname:8080," + * + * @param topic The topic to consume + * + * @return a consumer + */ + public static MRConsumer createConsumer ( String hostList, String topic ) + { + return createConsumer ( MRConsumerImpl.stringToList(hostList), topic ); + } + + /** + * Create a consumer instance with the default timeout and no limit + * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable + * across sessions. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection hostSet, String topic ) + { + return createConsumer ( hostSet, topic, null ); + } + + /** + * Create a consumer instance with server-side filtering, the default timeout, and no limit + * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable + * across sessions. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param filter a filter to use on the server side + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection hostSet, String topic, String filter ) + { + return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null ); + } + + /** + * Create a consumer instance with the default timeout, and no limit + * on messages returned. This consumer can operate in a logical group and is re-startable + * across sessions when you use the same group and ID on restart. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection hostSet, final String topic, final String consumerGroup, final String consumerId ) + { + return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 ); + } + + /** + * Create a consumer instance with the default timeout, and no limit + * on messages returned. This consumer can operate in a logical group and is re-startable + * across sessions when you use the same group and ID on restart. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. + * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit) + { + return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null ); + } + + /** + * Create a consumer instance with the default timeout, and no limit + * on messages returned. This consumer can operate in a logical group and is re-startable + * across sessions when you use the same group and ID on restart. This consumer also uses + * server-side filtering. + * + * @param hostList A comma separated list of hosts to use to connect to MR. + * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. + * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". + * + * @return a consumer + */ + public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) + { + return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup, + consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); + } + + /** + * Create a consumer instance with the default timeout, and no limit + * on messages returned. This consumer can operate in a logical group and is re-startable + * across sessions when you use the same group and ID on restart. This consumer also uses + * server-side filtering. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. + * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) + { + if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock; + try { + return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + /*************************************************************************/ + /*************************************************************************/ + /*************************************************************************/ + + /** + * Create a publisher that sends each message (or group of messages) immediately. Most + * applications should favor higher latency for much higher message throughput and the + * "simple publisher" is not a good choice. + * + * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. + * @param topic The topic on which to publish messages. + * @return a publisher + */ + public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic ) + { + return createBatchingPublisher ( hostlist, topic, 1, 1 ); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. Message payloads are not compressed. + * + * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs ) + { + return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false ); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. + * + * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) + { + return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress ); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. + * + * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) + { + final TreeSet hosts = new TreeSet (); + for ( String hp : hostSet ) + { + hosts.add ( hp ); + } + return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress ); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. + * + * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher ( Collection hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) + { + return new MRSimplerBatchPublisher.Builder (). + againstUrls ( hostSet ). + onTopic ( topic ). + batchTo ( maxBatchSize, maxAgeMs ). + compress ( compress ). + build (); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. + * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param topic The topic on which to publish messages. + * @param username username + * @param password password + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression + * @param protocolFlag http auth or ueb auth or dme2 method + * @param producerFilePath all properties for publisher + * @return MRBatchingPublisher obj + */ + public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath ) + { + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). + againstUrls(MRConsumerImpl.stringToList(host)). + onTopic ( topic ). + batchTo ( maxBatchSize, maxAgeMs ). + compress ( compress ). + build (); + + pub.setHost(host); + pub.setUsername(username); + pub.setPassword(password); + pub.setProtocolFlag(protocolFlag); + pub.setProducerFilePath(producerFilePath); + return pub; + } + + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown + * @param producerFilePath set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException exc + * @throws IOException ioex + */ + public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException { + FileReader reader = new FileReader(new File (producerFilePath)); + Properties props = new Properties(); + props.load(reader); + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). + againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). + onTopic ( props.getProperty("topic") ). + batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). + compress (Boolean.parseBoolean(props.getProperty("compress"))). + httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). + build (); + pub.setHost(props.getProperty("host")); + if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ + + pub.setAuthKey(props.getProperty("authKey")); + pub.setAuthDate(props.getProperty("authDate")); + pub.setUsername(props.getProperty("username")); + pub.setPassword(props.getProperty("password")); + }else{ + pub.setUsername(props.getProperty("username")); + pub.setPassword(props.getProperty("password")); + } + pub.setProducerFilePath(producerFilePath); + pub.setProtocolFlag(props.getProperty("TransportType")); + pub.setProps(props); + routeFilePath=props.getProperty("DME2preferredRouterFilePath"); + routeReader= new FileReader(new File (routeFilePath)); + prop= new Properties(); + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File(routeFilePath)); + } + //pub.setContentType(contentType); + return pub; + } + + /** + * Create a publisher that will contain send methods that return + * response object to user. + * @param producerFilePath set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException exc + * @throws IOException ioex + */ + public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException { + FileReader reader = new FileReader(new File (producerFilePath)); + Properties props = new Properties(); + props.load(reader); + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). + againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). + onTopic ( props.getProperty("topic") ). + batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). + compress (Boolean.parseBoolean(props.getProperty("compress"))). + httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). + withResponse(withResponse). + build (); + pub.setHost(props.getProperty("host")); + if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ + + pub.setAuthKey(props.getProperty("authKey")); + pub.setAuthDate(props.getProperty("authDate")); + pub.setUsername(props.getProperty("username")); + pub.setPassword(props.getProperty("password")); + }else{ + pub.setUsername(props.getProperty("username")); + pub.setPassword(props.getProperty("password")); + } + pub.setProducerFilePath(producerFilePath); + pub.setProtocolFlag(props.getProperty("TransportType")); + pub.setProps(props); + routeFilePath=props.getProperty("DME2preferredRouterFilePath"); + routeReader= new FileReader(new File (routeFilePath)); + prop= new Properties(); + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File(routeFilePath)); + } + //pub.setContentType(contentType); + return pub; + } + + + + + + + + + + + + /** + * Create an identity manager client to work with API keys. + * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param apiKey Your API key + * @param apiSecret Your API secret + * @return an identity manager + */ + public static MRIdentityManager createIdentityManager ( Collection hostSet, String apiKey, String apiSecret ) + { + MRIdentityManager cim; + try { + cim = new MRMetaClient ( hostSet ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + cim.setApiCredentials ( apiKey, apiSecret ); + return cim; + } + + /** + * Create a topic manager for working with topics. + * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param apiKey Your API key + * @param apiSecret Your API secret + * @return a topic manager + */ + public static MRTopicManager createTopicManager ( Collection hostSet, String apiKey, String apiSecret ) + { + MRMetaClient tmi; + try { + tmi = new MRMetaClient ( hostSet ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + tmi.setApiCredentials ( apiKey, apiSecret ); + return tmi; + } + + /** + * Inject a consumer. Used to support unit tests. + * @param cc + */ + public static void $testInject ( MRConsumer cc ) + { + MRClientBuilders.sfConsumerMock = cc; + } + + public static MRConsumer createConsumer(String host, String topic, String username, + String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) { + + MRConsumerImpl sub; + try { + sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + sub.setUsername(username); + sub.setPassword(password); + sub.setHost(host); + sub.setProtocolFlag(protocalFlag); + sub.setConsumerFilePath(consumerFilePath); + return sub; + + } + + public static MRConsumer createConsumer(String host, String topic, String username, + String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) { + + MRConsumerImpl sub; + try { + sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + sub.setUsername(username); + sub.setPassword(password); + sub.setHost(host); + sub.setProtocolFlag(protocalFlag); + sub.setConsumerFilePath(consumerFilePath); + return sub; + + } + + public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException { + FileReader reader = new FileReader(new File (consumerFilePath)); + Properties props = new Properties(); + props.load(reader); + int timeout; + if(props.getProperty("timeout")!=null) + timeout=Integer.parseInt(props.getProperty("timeout")); + else + timeout=-1; + int limit; + if(props.getProperty("limit")!=null) + limit=Integer.parseInt(props.getProperty("limit")); + else + limit=-1; + String group; + if(props.getProperty("group")==null) + group=UUID.randomUUID ().toString(); + else + group=props.getProperty("group"); + MRConsumerImpl sub=null; + if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ + sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") ); + sub.setAuthKey(props.getProperty("authKey")); + sub.setAuthDate(props.getProperty("authDate")); + sub.setUsername(props.getProperty("username")); + sub.setPassword(props.getProperty("password")); + }else{ + sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") ); + sub.setUsername(props.getProperty("username")); + sub.setPassword(props.getProperty("password")); + } + sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath")); + sub.setProps(props); + sub.setHost(props.getProperty("host")); + sub.setProtocolFlag(props.getProperty("TransportType")); + //sub.setConsumerFilePath(consumerFilePath); + sub.setfFilter(props.getProperty("filter")); + routeFilePath=props.getProperty("DME2preferredRouterFilePath"); + routeReader= new FileReader(new File (routeFilePath)); + prop= new Properties(); + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File(routeFilePath)); + } + return sub; + } +} -- cgit 1.2.3-korg