/******************************************************************************* * ============LICENSE_START======================================================= * org.onap.dmaap * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ package com.att.nsa.mr.client; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.net.MalformedURLException; import java.util.Collection; import java.util.Map; import java.util.Properties; import java.util.TreeSet; import java.util.UUID; import javax.ws.rs.core.MultivaluedMap; import com.att.nsa.mr.client.impl.MRConsumerImpl; import com.att.nsa.mr.client.impl.MRMetaClient; import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; /** * A factory for MR clients.
*
* Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across * them. Be sure to use a different ID for each instance.)
*
* Publishers * * @author author */ public class MRClientFactory { public static MultivaluedMap HTTPHeadersMap; public static Map DME2HeadersMap; public static String routeFilePath; public static FileReader routeReader; public static FileWriter routeWriter= null; public static Properties prop=null; //routeReader= new FileReader(new File (routeFilePath)); //props= new Properties(); /** * Create a consumer instance with the default timeout and no limit * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable * across sessions. * * @param hostList A comma separated list of hosts to use to connect to MR. * You can include port numbers (3904 is the default). For example, "hostname:8080," * * @param topic The topic to consume * * @return a consumer */ public static MRConsumer createConsumer ( String hostList, String topic ) { return createConsumer ( MRConsumerImpl.stringToList(hostList), topic ); } /** * Create a consumer instance with the default timeout and no limit * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable * across sessions. * * @param hostSet The host used in the URL to MR. Entries can be "host:port". * @param topic The topic to consume * * @return a consumer */ public static MRConsumer createConsumer ( Collection hostSet, String topic ) { return createConsumer ( hostSet, topic, null ); } /** * Create a consumer instance with server-side filtering, the default timeout, and no limit * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable * across sessions. * * @param hostSet The host used in the URL to MR. Entries can be "host:port". * @param topic The topic to consume * @param filter a filter to use on the server side * * @return a consumer */ public static MRConsumer createConsumer ( Collection hostSet, String topic, String filter ) { return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null ); } /** * Create a consumer instance with the default timeout, and no limit * on messages returned. This consumer can operate in a logical group and is re-startable * across sessions when you use the same group and ID on restart. * * @param hostSet The host used in the URL to MR. Entries can be "host:port". * @param topic The topic to consume * @param consumerGroup The name of the consumer group this consumer is part of * @param consumerId The unique id of this consume in its group * * @return a consumer */ public static MRConsumer createConsumer ( Collection hostSet, final String topic, final String consumerGroup, final String consumerId ) { return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 ); } /** * Create a consumer instance with the default timeout, and no limit * on messages returned. This consumer can operate in a logical group and is re-startable * across sessions when you use the same group and ID on restart. * * @param hostSet The host used in the URL to MR. Entries can be "host:port". * @param topic The topic to consume * @param consumerGroup The name of the consumer group this consumer is part of * @param consumerId The unique id of this consume in its group * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. * * @return a consumer */ public static MRConsumer createConsumer ( Collection hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit) { return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null ); } /** * Create a consumer instance with the default timeout, and no limit * on messages returned. This consumer can operate in a logical group and is re-startable * across sessions when you use the same group and ID on restart. This consumer also uses * server-side filtering. * * @param hostList A comma separated list of hosts to use to connect to MR. * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" * @param topic The topic to consume * @param consumerGroup The name of the consumer group this consumer is part of * @param consumerId The unique id of this consume in its group * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". * * @return a consumer */ public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) { return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); } /** * Create a consumer instance with the default timeout, and no limit * on messages returned. This consumer can operate in a logical group and is re-startable * across sessions when you use the same group and ID on restart. This consumer also uses * server-side filtering. * * @param hostSet The host used in the URL to MR. Entries can be "host:port". * @param topic The topic to consume * @param consumerGroup The name of the consumer group this consumer is part of * @param consumerId The unique id of this consume in its group * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". * * @return a consumer */ public static MRConsumer createConsumer ( Collection hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) { if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock; try { return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); } catch (MalformedURLException e) { throw new RuntimeException(e); } } /*************************************************************************/ /*************************************************************************/ /*************************************************************************/ /** * Create a publisher that sends each message (or group of messages) immediately. Most * applications should favor higher latency for much higher message throughput and the * "simple publisher" is not a good choice. * * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. * @param topic The topic on which to publish messages. * @return a publisher */ public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic ) { return createBatchingPublisher ( hostlist, topic, 1, 1 ); } /** * Create a publisher that batches messages. Be sure to close the publisher to * send the last batch and ensure a clean shutdown. Message payloads are not compressed. * * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. * @param topic The topic on which to publish messages. * @param maxBatchSize The largest set of messages to batch * @param maxAgeMs The maximum age of a message waiting in a batch * * @return a publisher */ public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs ) { return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false ); } /** * Create a publisher that batches messages. Be sure to close the publisher to * send the last batch and ensure a clean shutdown. * * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. * @param topic The topic on which to publish messages. * @param maxBatchSize The largest set of messages to batch * @param maxAgeMs The maximum age of a message waiting in a batch * @param compress use gzip compression * * @return a publisher */ public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) { return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress ); } /** * Create a publisher that batches messages. Be sure to close the publisher to * send the last batch and ensure a clean shutdown. * * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. * @param topic The topic on which to publish messages. * @param maxBatchSize The largest set of messages to batch * @param maxAgeMs The maximum age of a message waiting in a batch * @param compress use gzip compression * * @return a publisher */ public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) { final TreeSet hosts = new TreeSet (); for ( String hp : hostSet ) { hosts.add ( hp ); } return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress ); } /** * Create a publisher that batches messages. Be sure to close the publisher to * send the last batch and ensure a clean shutdown. * * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. * @param topic The topic on which to publish messages. * @param maxBatchSize The largest set of messages to batch * @param maxAgeMs The maximum age of a message waiting in a batch * @param compress use gzip compression * * @return a publisher */ public static MRBatchingPublisher createBatchingPublisher ( Collection hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) { return new MRSimplerBatchPublisher.Builder (). againstUrls ( hostSet ). onTopic ( topic ). batchTo ( maxBatchSize, maxAgeMs ). compress ( compress ). build (); } /** * Create a publisher that batches messages. Be sure to close the publisher to * send the last batch and ensure a clean shutdown. * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. * @param topic The topic on which to publish messages. * @param username username * @param password password * @param maxBatchSize The largest set of messages to batch * @param maxAgeMs The maximum age of a message waiting in a batch * @param compress use gzip compression * @param protocolFlag http auth or ueb auth or dme2 method * @param producerFilePath all properties for publisher * @return MRBatchingPublisher obj */ public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath ) { MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). againstUrls(MRConsumerImpl.stringToList(host)). onTopic ( topic ). batchTo ( maxBatchSize, maxAgeMs ). compress ( compress ). build (); pub.setHost(host); pub.setUsername(username); pub.setPassword(password); pub.setProtocolFlag(protocolFlag); pub.setProducerFilePath(producerFilePath); return pub; } /** * Create a publisher that batches messages. Be sure to close the publisher to * send the last batch and ensure a clean shutdown * @param producerFilePath set all properties for publishing message * @return MRBatchingPublisher obj * @throws FileNotFoundException exc * @throws IOException ioex */ public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException { FileReader reader = new FileReader(new File (producerFilePath)); Properties props = new Properties(); props.load(reader); MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). onTopic ( props.getProperty("topic") ). batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). compress (Boolean.parseBoolean(props.getProperty("compress"))). httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). build (); pub.setHost(props.getProperty("host")); if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ pub.setAuthKey(props.getProperty("authKey")); pub.setAuthDate(props.getProperty("authDate")); pub.setUsername(props.getProperty("username")); pub.setPassword(props.getProperty("password")); }else{ pub.setUsername(props.getProperty("username")); pub.setPassword(props.getProperty("password")); } pub.setProducerFilePath(producerFilePath); pub.setProtocolFlag(props.getProperty("TransportType")); pub.setProps(props); routeFilePath=props.getProperty("DME2preferredRouterFilePath"); routeReader= new FileReader(new File (routeFilePath)); prop= new Properties(); File fo= new File(routeFilePath); if(!fo.exists()){ routeWriter=new FileWriter(new File(routeFilePath)); } //pub.setContentType(contentType); return pub; } /** * Create a publisher that will contain send methods that return * response object to user. * @param producerFilePath set all properties for publishing message * @return MRBatchingPublisher obj * @throws FileNotFoundException exc * @throws IOException ioex */ public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException { FileReader reader = new FileReader(new File (producerFilePath)); Properties props = new Properties(); props.load(reader); MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). onTopic ( props.getProperty("topic") ). batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). compress (Boolean.parseBoolean(props.getProperty("compress"))). httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). withResponse(withResponse). build (); pub.setHost(props.getProperty("host")); if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ pub.setAuthKey(props.getProperty("authKey")); pub.setAuthDate(props.getProperty("authDate")); pub.setUsername(props.getProperty("username")); pub.setPassword(props.getProperty("password")); }else{ pub.setUsername(props.getProperty("username")); pub.setPassword(props.getProperty("password")); } pub.setProducerFilePath(producerFilePath); pub.setProtocolFlag(props.getProperty("TransportType")); pub.setProps(props); routeFilePath=props.getProperty("DME2preferredRouterFilePath"); routeReader= new FileReader(new File (routeFilePath)); prop= new Properties(); File fo= new File(routeFilePath); if(!fo.exists()){ routeWriter=new FileWriter(new File(routeFilePath)); } //pub.setContentType(contentType); return pub; } /** * Create an identity manager client to work with API keys. * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. * @param apiKey Your API key * @param apiSecret Your API secret * @return an identity manager */ public static MRIdentityManager createIdentityManager ( Collection hostSet, String apiKey, String apiSecret ) { MRIdentityManager cim; try { cim = new MRMetaClient ( hostSet ); } catch (MalformedURLException e) { throw new RuntimeException(e); } cim.setApiCredentials ( apiKey, apiSecret ); return cim; } /** * Create a topic manager for working with topics. * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. * @param apiKey Your API key * @param apiSecret Your API secret * @return a topic manager */ public static MRTopicManager createTopicManager ( Collection hostSet, String apiKey, String apiSecret ) { MRMetaClient tmi; try { tmi = new MRMetaClient ( hostSet ); } catch (MalformedURLException e) { throw new RuntimeException(e); } tmi.setApiCredentials ( apiKey, apiSecret ); return tmi; } /** * Inject a consumer. Used to support unit tests. * @param cc */ public static void $testInject ( MRConsumer cc ) { MRClientBuilders.sfConsumerMock = cc; } public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) { MRConsumerImpl sub; try { sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); } catch (MalformedURLException e) { throw new RuntimeException(e); } sub.setUsername(username); sub.setPassword(password); sub.setHost(host); sub.setProtocolFlag(protocalFlag); sub.setConsumerFilePath(consumerFilePath); return sub; } public static MRConsumer createConsumer(String host, String topic, String username, String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) { MRConsumerImpl sub; try { sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); } catch (MalformedURLException e) { throw new RuntimeException(e); } sub.setUsername(username); sub.setPassword(password); sub.setHost(host); sub.setProtocolFlag(protocalFlag); sub.setConsumerFilePath(consumerFilePath); return sub; } public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException { FileReader reader = new FileReader(new File (consumerFilePath)); Properties props = new Properties(); props.load(reader); int timeout; if(props.getProperty("timeout")!=null) timeout=Integer.parseInt(props.getProperty("timeout")); else timeout=-1; int limit; if(props.getProperty("limit")!=null) limit=Integer.parseInt(props.getProperty("limit")); else limit=-1; String group; if(props.getProperty("group")==null) group=UUID.randomUUID ().toString(); else group=props.getProperty("group"); MRConsumerImpl sub=null; if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") ); sub.setAuthKey(props.getProperty("authKey")); sub.setAuthDate(props.getProperty("authDate")); sub.setUsername(props.getProperty("username")); sub.setPassword(props.getProperty("password")); }else{ sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") ); sub.setUsername(props.getProperty("username")); sub.setPassword(props.getProperty("password")); } sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath")); sub.setProps(props); sub.setHost(props.getProperty("host")); sub.setProtocolFlag(props.getProperty("TransportType")); //sub.setConsumerFilePath(consumerFilePath); sub.setfFilter(props.getProperty("filter")); routeFilePath=props.getProperty("DME2preferredRouterFilePath"); routeReader= new FileReader(new File (routeFilePath)); prop= new Properties(); File fo= new File(routeFilePath); if(!fo.exists()){ routeWriter=new FileWriter(new File(routeFilePath)); } return sub; } }