diff options
Diffstat (limited to 'src/main/java/com')
45 files changed, 7457 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/HostSelector.java b/src/main/java/com/att/nsa/mr/client/HostSelector.java new file mode 100644 index 0000000..de0e52c --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/HostSelector.java @@ -0,0 +1,191 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.Vector; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HostSelector +{ + private final TreeSet<String> fBaseHosts; + private final DelayQueue<BlacklistEntry> fBlacklist; + private String fIdealHost; + private String fCurrentHost; + private static final Logger log = LoggerFactory.getLogger(HostSelector.class); + + public HostSelector(String hostPart) + { + this(makeSet(hostPart), null); + } + + public HostSelector(Collection<String> baseHosts) + { + this(baseHosts, null); + } + + public HostSelector(Collection<String> baseHosts, String signature) + { + if (baseHosts.size() < 1) + { + throw new IllegalArgumentException("At least one host must be provided."); + } + + this.fBaseHosts = new TreeSet(baseHosts); + this.fBlacklist = new DelayQueue(); + this.fIdealHost = null; + + if (signature == null) { + return; + } + int index = Math.abs(signature.hashCode()) % baseHosts.size(); + + Iterator it = this.fBaseHosts.iterator(); + while (index-- > 0) + { + it.next(); + } + this.fIdealHost = ((String)it.next()); + } + + public String selectBaseHost() + { + if (this.fCurrentHost == null) + { + makeSelection(); + } + return this.fCurrentHost; + } + + public void reportReachabilityProblem(long blacklistUnit, TimeUnit blacklistTimeUnit) + { + if (this.fCurrentHost == null) + { + log.warn("Reporting reachability problem, but no host is currently selected."); + } + + if (blacklistUnit > 0L) + { + for (BlacklistEntry be : this.fBlacklist) + { + if (be.getHost().equals(this.fCurrentHost)) + { + be.expireNow(); + } + } + + LinkedList devNull = new LinkedList(); + this.fBlacklist.drainTo(devNull); + + if (this.fCurrentHost != null) + { + this.fBlacklist.add(new BlacklistEntry(this.fCurrentHost, TimeUnit.MILLISECONDS.convert(blacklistUnit, blacklistTimeUnit))); + } + } + this.fCurrentHost = null; + } + + private String makeSelection() + { + TreeSet workingSet = new TreeSet(this.fBaseHosts); + + LinkedList devNull = new LinkedList(); + this.fBlacklist.drainTo(devNull); + for (BlacklistEntry be : this.fBlacklist) + { + workingSet.remove(be.getHost()); + } + + if (workingSet.size() == 0) + { + log.warn("All hosts were blacklisted; reverting to full set of hosts."); + workingSet.addAll(this.fBaseHosts); + this.fCurrentHost = null; + } + + String selection = null; + if ((this.fCurrentHost != null) && (workingSet.contains(this.fCurrentHost))) + { + selection = this.fCurrentHost; + } + else if ((this.fIdealHost != null) && (workingSet.contains(this.fIdealHost))) + { + selection = this.fIdealHost; + } + else + { + Vector v = new Vector(workingSet); + int index = Math.abs(new Random().nextInt()) % workingSet.size(); + selection = (String)v.elementAt(index); + } + + this.fCurrentHost = selection; + return this.fCurrentHost; + } + + private static Set<String> makeSet(String s) + { + TreeSet set = new TreeSet(); + set.add(s); + return set; } + + private static class BlacklistEntry implements Delayed { + private final String fHost; + private long fExpireAtMs; + + public BlacklistEntry(String host, long delayMs) { + this.fHost = host; + this.fExpireAtMs = (System.currentTimeMillis() + delayMs); + } + + public void expireNow() + { + this.fExpireAtMs = 0L; + } + + public String getHost() + { + return this.fHost; + } + + public int compareTo(Delayed o) + { + Long thisDelay = Long.valueOf(getDelay(TimeUnit.MILLISECONDS)); + return thisDelay.compareTo(Long.valueOf(o.getDelay(TimeUnit.MILLISECONDS))); + } + + public long getDelay(TimeUnit unit) + { + long remainingMs = this.fExpireAtMs - System.currentTimeMillis(); + return unit.convert(remainingMs, TimeUnit.MILLISECONDS); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/client/MRBatchingPublisher.java b/src/main/java/com/att/nsa/mr/client/MRBatchingPublisher.java new file mode 100644 index 0000000..875b5a3 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/MRBatchingPublisher.java @@ -0,0 +1,55 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import com.att.nsa.mr.client.response.MRPublisherResponse; + +/** + * A MR batching publisher is a publisher with additional functionality + * for managing delayed sends. + * + * @author author + * + */ +public interface MRBatchingPublisher extends MRPublisher +{ + /** + * Get the number of messages that have not yet been sent. + * @return the number of pending messages + */ + int getPendingMessageCount (); + + /** + * Close this publisher, sending any remaining messages. + * @param timeout an amount of time to wait for unsent messages to be sent + * @param timeoutUnits the time unit for the timeout arg + * @return a list of any unsent messages after the timeout + * @throws IOException exception + * @throws InterruptedException exception + */ + List<message> close ( long timeout, TimeUnit timeoutUnits ) throws IOException, InterruptedException; + + MRPublisherResponse sendBatchWithResponse (); +} diff --git a/src/main/java/com/att/nsa/mr/client/MRClient.java b/src/main/java/com/att/nsa/mr/client/MRClient.java new file mode 100644 index 0000000..f3d5c88 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/MRClient.java @@ -0,0 +1,66 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import org.slf4j.Logger; + + +public interface MRClient +{ + /** + * An exception at the MR layer. This is used when the HTTP transport + * layer returns a success code but the transaction is not completed as expected. + */ + public class MRApiException extends Exception + { + public MRApiException ( String msg ) { super ( msg ); } + public MRApiException ( String msg, Throwable t ) { super ( msg, t ); } + private static final long serialVersionUID = 1L; + } + + /** + * Optionally set the Logger to use + * @param log log + */ + void logTo ( Logger log ); + + /** + * Set the API credentials for this client connection. Subsequent calls will + * include authentication headers.who i + */ + /** + * @param apiKey apikey + * @param apiSecret apisec + */ + void setApiCredentials ( String apiKey, String apiSecret ); + + /** + * Remove API credentials, if any, on this connection. Subsequent calls will not include + * authentication headers. + */ + void clearApiCredentials (); + + /** + * Close this connection. Some client interfaces have additional close capability. + */ + void close (); +} diff --git a/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java b/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java new file mode 100644 index 0000000..00c8915 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java @@ -0,0 +1,383 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import java.net.MalformedURLException; +import java.util.Collection; +import java.util.TreeSet; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.client.impl.MRMetaClient; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; + +/** + * A collection of builders for various types of MR API clients + * + * @author author + */ +public class MRClientBuilders +{ + /** + * A builder for a topic Consumer + * @author author + */ + public static class ConsumerBuilder + { + /** + * Construct a consumer builder. + */ + public ConsumerBuilder () {} + + /** + * Set the host list + * @param hostList a comma-separated list of hosts to use to connect to MR + * @return this builder + */ + public ConsumerBuilder usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); } + + /** + * Set the host list + * @param hostSet a set of hosts to use to connect to MR + * @return this builder + */ + public ConsumerBuilder usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; } + + /** + * Set the topic + * @param topic the name of the topic to consume + * @return this builder + */ + public ConsumerBuilder onTopic ( String topic ) { fTopic=topic; return this; } + + /** + * Set the consumer's group and ID + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consumer in its group + * @return this builder + */ + public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { fGroup = consumerGroup; fId = consumerId; return this; } + + /** + * Set the API key and secret for this client. + * @param apiKey + * @param apiSecret + * @return this builder + */ + public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; } + + /** + * Set the server side timeout + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. + * @return this builder + */ + public ConsumerBuilder waitAtServer ( int timeoutMs ) { fTimeoutMs = timeoutMs; return this; }; + + /** + * Set the maximum number of messages to receive per transaction + * @param limit The maximum number of messages to receive from the server in one transaction. + * @return this builder + */ + public ConsumerBuilder receivingAtMost ( int limit ) { fLimit = limit; return this; }; + + /** + * Set a filter to use on the server + * @param filter a Highland Park standard library filter encoded in JSON + * @return this builder + */ + public ConsumerBuilder withServerSideFilter ( String filter ) { fFilter = filter; return this; } + + /** + * Build the consumer + * @return a consumer + */ + public MRConsumer build () + { + if ( fHosts == null || fHosts.size() == 0 || fTopic == null ) + { + throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); + } + + if ( fGroup == null ) + { + fGroup = UUID.randomUUID ().toString (); + fId = "0"; + log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." ); + } + + if ( sfConsumerMock != null ) return sfConsumerMock; + try { + return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + private Collection<String> fHosts = null; + private String fTopic = null; + private String fGroup = null; + private String fId = null; + private String fApiKey = null; + private String fApiSecret = null; + private int fTimeoutMs = -1; + private int fLimit = -1; + private String fFilter = null; + } + + /*************************************************************************/ + /*************************************************************************/ + /*************************************************************************/ + + /** + * A publisher builder + * @author author + */ + public static class PublisherBuilder + { + public PublisherBuilder () {} + + /** + * Set the MR/UEB host(s) to use + * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. + * @return this builder + */ + public PublisherBuilder usingHosts ( String hostlist ) { return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); } + + /** + * Set the MR/UEB host(s) to use + * @param hostSet The host(s) used in the URL to MR. Can be "host:port" + * @return this builder + */ + public PublisherBuilder usingHosts ( String[] hostSet ) + { + final TreeSet<String> hosts = new TreeSet<String> (); + for ( String hp : hostSet ) + { + hosts.add ( hp ); + } + return usingHosts ( hosts ); + } + + /** + * Set the MR/UEB host(s) to use + * @param hostlist The host(s) used in the URL to MR. Can be "host:port". + * @return this builder + */ + public PublisherBuilder usingHosts ( Collection<String> hostlist ) { fHosts=hostlist; return this; } + + /** + * Set the topic to publish on + * @param topic The topic on which to publish messages. + * @return this builder + */ + public PublisherBuilder onTopic ( String topic ) { fTopic = topic; return this; } + + /** + * Batch message sends with the given limits. + * @param messageCount The largest set of messages to batch. + * @param ageInMs The maximum age of a message waiting in a batch. + * @return this builder + */ + public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { fMaxBatchSize = messageCount; fMaxBatchAgeMs = ageInMs; return this; } + + /** + * Compress transactions + * @return this builder + */ + public PublisherBuilder withCompresion () { return enableCompresion(true); } + + /** + * Do not compress transactions + * @return this builder + */ + public PublisherBuilder withoutCompresion () { return enableCompresion(false); } + + /** + * Set the compression option + * @param compress true to gzip compress transactions + * @return this builder + */ + public PublisherBuilder enableCompresion ( boolean compress ) { fCompress = compress; return this; } + + /** + * Set the API key and secret for this client. + * @param apiKey + * @param apiSecret + * @return this builder + */ + public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; } + + /** + * Build the publisher + * @return a batching publisher + */ + public MRBatchingPublisher build () + { + if ( fHosts == null || fHosts.size() == 0 || fTopic == null ) + { + throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); + } + + if ( sfPublisherMock != null ) return sfPublisherMock; + + final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). + againstUrls ( fHosts ). + onTopic ( fTopic ). + batchTo ( fMaxBatchSize, fMaxBatchAgeMs ). + compress ( fCompress ). + build (); + if ( fApiKey != null ) + { + pub.setApiCredentials ( fApiKey, fApiSecret ); + } + return pub; + } + + private Collection<String> fHosts = null; + private String fTopic = null; + private int fMaxBatchSize = 1; + private int fMaxBatchAgeMs = 1; + private boolean fCompress = false; + private String fApiKey = null; + private String fApiSecret = null; + } + + /** + * A builder for an identity manager + * @author author + */ + public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager> + { + /** + * Construct an identity manager builder. + */ + public IdentityManagerBuilder () {} + + @Override + protected MRIdentityManager constructClient ( Collection<String> hosts ) { try { + return new MRMetaClient ( hosts ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } } + } + + /** + * A builder for a topic manager + * @author author + */ + public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager> + { + /** + * Construct an topic manager builder. + */ + public TopicManagerBuilder () {} + + @Override + protected MRTopicManager constructClient ( Collection<String> hosts ) { try { + return new MRMetaClient ( hosts ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } } + } + + /** + * Inject a consumer. Used to support unit tests. + * @param cc + */ + public static void $testInject ( MRConsumer cc ) + { + sfConsumerMock = cc; + } + + /** + * Inject a publisher. Used to support unit tests. + * @param pub + */ + public static void $testInject ( MRBatchingPublisher pub ) + { + sfPublisherMock = pub; + } + + static MRConsumer sfConsumerMock = null; + static MRBatchingPublisher sfPublisherMock = null; + + /** + * A builder for an identity manager + * @author author + */ + public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient> + { + /** + * Construct an identity manager builder. + */ + public AbstractAuthenticatedManagerBuilder () {} + + /** + * Set the host list + * @param hostList a comma-separated list of hosts to use to connect to MR + * @return this builder + */ + public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); } + + /** + * Set the host list + * @param hostSet a set of hosts to use to connect to MR + * @return this builder + */ + public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; } + + /** + * Set the API key and secret for this client. + * @param apiKey + * @param apiSecret + * @return this builder + */ + public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; } + + /** + * Build the consumer + * @return a consumer + */ + public T build () + { + if ( fHosts == null || fHosts.size() == 0 ) + { + throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); + } + + final T mgr = constructClient ( fHosts ); + mgr.setApiCredentials ( fApiKey, fApiSecret ); + return mgr; + } + + protected abstract T constructClient ( Collection<String> hosts ); + + private Collection<String> fHosts = null; + private String fApiKey = null; + private String fApiSecret = null; + } + + private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class ); +} diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java new file mode 100644 index 0000000..59e472c --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java @@ -0,0 +1,558 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import java.util.TreeSet; +import java.util.UUID; + +import javax.ws.rs.core.MultivaluedMap; + +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.client.impl.MRMetaClient; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +/** + * A factory for MR clients.<br/> + * <br/> + * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates + * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive + * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's + * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across + * them. Be sure to use a different ID for each instance.)<br/> + * <br/> + * Publishers + * + * @author author + */ +public class MRClientFactory +{ + public static MultivaluedMap<String, Object> HTTPHeadersMap; + public static Map<String, String> DME2HeadersMap; + public static String routeFilePath; + + public static FileReader routeReader; + + public static FileWriter routeWriter= null; + public static Properties prop=null; + //routeReader= new FileReader(new File (routeFilePath)); + //props= new Properties(); + /** + * Create a consumer instance with the default timeout and no limit + * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable + * across sessions. + * + * @param hostList A comma separated list of hosts to use to connect to MR. + * You can include port numbers (3904 is the default). For example, "hostname:8080," + * + * @param topic The topic to consume + * + * @return a consumer + */ + public static MRConsumer createConsumer ( String hostList, String topic ) + { + return createConsumer ( MRConsumerImpl.stringToList(hostList), topic ); + } + + /** + * Create a consumer instance with the default timeout and no limit + * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable + * across sessions. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection<String> hostSet, String topic ) + { + return createConsumer ( hostSet, topic, null ); + } + + /** + * Create a consumer instance with server-side filtering, the default timeout, and no limit + * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable + * across sessions. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param filter a filter to use on the server side + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter ) + { + return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null ); + } + + /** + * Create a consumer instance with the default timeout, and no limit + * on messages returned. This consumer can operate in a logical group and is re-startable + * across sessions when you use the same group and ID on restart. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId ) + { + return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 ); + } + + /** + * Create a consumer instance with the default timeout, and no limit + * on messages returned. This consumer can operate in a logical group and is re-startable + * across sessions when you use the same group and ID on restart. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. + * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit) + { + return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null ); + } + + /** + * Create a consumer instance with the default timeout, and no limit + * on messages returned. This consumer can operate in a logical group and is re-startable + * across sessions when you use the same group and ID on restart. This consumer also uses + * server-side filtering. + * + * @param hostList A comma separated list of hosts to use to connect to MR. + * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. + * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". + * + * @return a consumer + */ + public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) + { + return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup, + consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); + } + + /** + * Create a consumer instance with the default timeout, and no limit + * on messages returned. This consumer can operate in a logical group and is re-startable + * across sessions when you use the same group and ID on restart. This consumer also uses + * server-side filtering. + * + * @param hostSet The host used in the URL to MR. Entries can be "host:port". + * @param topic The topic to consume + * @param consumerGroup The name of the consumer group this consumer is part of + * @param consumerId The unique id of this consume in its group + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. + * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". + * + * @return a consumer + */ + public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) + { + if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock; + try { + return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + /*************************************************************************/ + /*************************************************************************/ + /*************************************************************************/ + + /** + * Create a publisher that sends each message (or group of messages) immediately. Most + * applications should favor higher latency for much higher message throughput and the + * "simple publisher" is not a good choice. + * + * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. + * @param topic The topic on which to publish messages. + * @return a publisher + */ + public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic ) + { + return createBatchingPublisher ( hostlist, topic, 1, 1 ); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. Message payloads are not compressed. + * + * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs ) + { + return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false ); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. + * + * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) + { + return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress ); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. + * + * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) + { + final TreeSet<String> hosts = new TreeSet<String> (); + for ( String hp : hostSet ) + { + hosts.add ( hp ); + } + return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress ); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. + * + * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param topic The topic on which to publish messages. + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression + * + * @return a publisher + */ + public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) + { + return new MRSimplerBatchPublisher.Builder (). + againstUrls ( hostSet ). + onTopic ( topic ). + batchTo ( maxBatchSize, maxAgeMs ). + compress ( compress ). + build (); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown. + * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param topic The topic on which to publish messages. + * @param username username + * @param password password + * @param maxBatchSize The largest set of messages to batch + * @param maxAgeMs The maximum age of a message waiting in a batch + * @param compress use gzip compression + * @param protocolFlag http auth or ueb auth or dme2 method + * @param producerFilePath all properties for publisher + * @return MRBatchingPublisher obj + */ + public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath ) + { + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). + againstUrls(MRConsumerImpl.stringToList(host)). + onTopic ( topic ). + batchTo ( maxBatchSize, maxAgeMs ). + compress ( compress ). + build (); + + pub.setHost(host); + pub.setUsername(username); + pub.setPassword(password); + pub.setProtocolFlag(protocolFlag); + pub.setProducerFilePath(producerFilePath); + return pub; + } + + + /** + * Create a publisher that batches messages. Be sure to close the publisher to + * send the last batch and ensure a clean shutdown + * @param producerFilePath set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException exc + * @throws IOException ioex + */ + public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException { + FileReader reader = new FileReader(new File (producerFilePath)); + Properties props = new Properties(); + props.load(reader); + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). + againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). + onTopic ( props.getProperty("topic") ). + batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). + compress (Boolean.parseBoolean(props.getProperty("compress"))). + httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). + build (); + pub.setHost(props.getProperty("host")); + if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ + + pub.setAuthKey(props.getProperty("authKey")); + pub.setAuthDate(props.getProperty("authDate")); + pub.setUsername(props.getProperty("username")); + pub.setPassword(props.getProperty("password")); + }else{ + pub.setUsername(props.getProperty("username")); + pub.setPassword(props.getProperty("password")); + } + pub.setProducerFilePath(producerFilePath); + pub.setProtocolFlag(props.getProperty("TransportType")); + pub.setProps(props); + routeFilePath=props.getProperty("DME2preferredRouterFilePath"); + routeReader= new FileReader(new File (routeFilePath)); + prop= new Properties(); + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File(routeFilePath)); + } + //pub.setContentType(contentType); + return pub; + } + + /** + * Create a publisher that will contain send methods that return + * response object to user. + * @param producerFilePath set all properties for publishing message + * @return MRBatchingPublisher obj + * @throws FileNotFoundException exc + * @throws IOException ioex + */ + public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException { + FileReader reader = new FileReader(new File (producerFilePath)); + Properties props = new Properties(); + props.load(reader); + MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). + againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). + onTopic ( props.getProperty("topic") ). + batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). + compress (Boolean.parseBoolean(props.getProperty("compress"))). + httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). + withResponse(withResponse). + build (); + pub.setHost(props.getProperty("host")); + if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ + + pub.setAuthKey(props.getProperty("authKey")); + pub.setAuthDate(props.getProperty("authDate")); + pub.setUsername(props.getProperty("username")); + pub.setPassword(props.getProperty("password")); + }else{ + pub.setUsername(props.getProperty("username")); + pub.setPassword(props.getProperty("password")); + } + pub.setProducerFilePath(producerFilePath); + pub.setProtocolFlag(props.getProperty("TransportType")); + pub.setProps(props); + routeFilePath=props.getProperty("DME2preferredRouterFilePath"); + routeReader= new FileReader(new File (routeFilePath)); + prop= new Properties(); + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File(routeFilePath)); + } + //pub.setContentType(contentType); + return pub; + } + + + + + + + + + + + + /** + * Create an identity manager client to work with API keys. + * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param apiKey Your API key + * @param apiSecret Your API secret + * @return an identity manager + */ + public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret ) + { + MRIdentityManager cim; + try { + cim = new MRMetaClient ( hostSet ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + cim.setApiCredentials ( apiKey, apiSecret ); + return cim; + } + + /** + * Create a topic manager for working with topics. + * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. + * @param apiKey Your API key + * @param apiSecret Your API secret + * @return a topic manager + */ + public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret ) + { + MRMetaClient tmi; + try { + tmi = new MRMetaClient ( hostSet ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + tmi.setApiCredentials ( apiKey, apiSecret ); + return tmi; + } + + /** + * Inject a consumer. Used to support unit tests. + * @param cc + */ + public static void $testInject ( MRConsumer cc ) + { + MRClientBuilders.sfConsumerMock = cc; + } + + public static MRConsumer createConsumer(String host, String topic, String username, + String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) { + + MRConsumerImpl sub; + try { + sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + sub.setUsername(username); + sub.setPassword(password); + sub.setHost(host); + sub.setProtocolFlag(protocalFlag); + sub.setConsumerFilePath(consumerFilePath); + return sub; + + } + + public static MRConsumer createConsumer(String host, String topic, String username, + String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) { + + MRConsumerImpl sub; + try { + sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + sub.setUsername(username); + sub.setPassword(password); + sub.setHost(host); + sub.setProtocolFlag(protocalFlag); + sub.setConsumerFilePath(consumerFilePath); + return sub; + + } + + public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException { + FileReader reader = new FileReader(new File (consumerFilePath)); + Properties props = new Properties(); + props.load(reader); + int timeout; + if(props.getProperty("timeout")!=null) + timeout=Integer.parseInt(props.getProperty("timeout")); + else + timeout=-1; + int limit; + if(props.getProperty("limit")!=null) + limit=Integer.parseInt(props.getProperty("limit")); + else + limit=-1; + String group; + if(props.getProperty("group")==null) + group=UUID.randomUUID ().toString(); + else + group=props.getProperty("group"); + MRConsumerImpl sub=null; + if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ + sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") ); + sub.setAuthKey(props.getProperty("authKey")); + sub.setAuthDate(props.getProperty("authDate")); + sub.setUsername(props.getProperty("username")); + sub.setPassword(props.getProperty("password")); + }else{ + sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") ); + sub.setUsername(props.getProperty("username")); + sub.setPassword(props.getProperty("password")); + } + sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath")); + sub.setProps(props); + sub.setHost(props.getProperty("host")); + sub.setProtocolFlag(props.getProperty("TransportType")); + //sub.setConsumerFilePath(consumerFilePath); + sub.setfFilter(props.getProperty("filter")); + routeFilePath=props.getProperty("DME2preferredRouterFilePath"); + routeReader= new FileReader(new File (routeFilePath)); + prop= new Properties(); + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File(routeFilePath)); + } + return sub; + } +} diff --git a/src/main/java/com/att/nsa/mr/client/MRConsumer.java b/src/main/java/com/att/nsa/mr/client/MRConsumer.java new file mode 100644 index 0000000..444eb7c --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/MRConsumer.java @@ -0,0 +1,54 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import java.io.IOException; + +import com.att.nsa.mr.client.response.MRConsumerResponse; + +public interface MRConsumer extends MRClient +{ + /** + * Fetch a set of messages. The consumer's timeout and message limit are used if set in the constructor call. + + * @return a set of messages + * @throws IOException + */ + Iterable<String> fetch () throws IOException, Exception; + + /** + * Fetch a set of messages with an explicit timeout and limit for this call. These values + * override any set in the constructor call. + * + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection + * open while waiting for message traffic. Use -1 for default timeout (controlled on the server-side). + * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * @return a set messages + * @throws IOException if there's a problem connecting to the server + */ + Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException, Exception; + + MRConsumerResponse fetchWithReturnConsumerResponse (); + + + MRConsumerResponse fetchWithReturnConsumerResponse ( int timeoutMs, int limit ); +} diff --git a/src/main/java/com/att/nsa/mr/client/MRIdentityManager.java b/src/main/java/com/att/nsa/mr/client/MRIdentityManager.java new file mode 100644 index 0000000..5483761 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/MRIdentityManager.java @@ -0,0 +1,100 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import java.io.IOException; + +import com.att.nsa.apiClient.credentials.ApiCredential; +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.apiClient.http.HttpObjectNotFoundException; + +/** + * A client for manipulating API keys. + * @author author + * + */ +public interface MRIdentityManager extends MRClient +{ + /** + * An API Key record + */ + public interface ApiKey + { + /** + * Get the email address associated with the API key + * @return the email address on the API key or null + */ + String getEmail (); + + /** + * Get the description associated with the API key + * @return the description on the API key or null + */ + String getDescription (); + } + + /** + * Create a new API key on the UEB cluster. The returned credential instance + * contains the new API key and API secret. This is the only time the secret + * is available to the client -- there's no API for retrieving it later -- so + * your application must store it securely. + * + * @param email + * @param description + * @return a new credential + * @throws HttpException + * @throws MRApiException + * @throws IOException + */ + ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException; + + /** + * Get basic info about a known API key + * @param apiKey + * @return the API key's info or null if it doesn't exist + * @throws HttpObjectNotFoundException, HttpException, MRApiException + * @throws IOException + */ + ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, MRApiException, IOException; + + /** + * Update the record for the API key used to authenticate this request. The UEB + * API requires that you authenticate with the same key you're updating, so the + * API key being changed is the one used for setApiCredentials. + * + * @param email use null to keep the current value + * @param description use null to keep the current value + * @throws IOException + * @throws HttpException + * @throws HttpObjectNotFoundException + */ + void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException; + + /** + * Delete the *current* API key. After this call returns, the API key + * used to authenticate will no longer be valid. + * + * @throws IOException + * @throws HttpException + */ + void deleteCurrentApiKey () throws HttpException, IOException; +} diff --git a/src/main/java/com/att/nsa/mr/client/MRPublisher.java b/src/main/java/com/att/nsa/mr/client/MRPublisher.java new file mode 100644 index 0000000..651c548 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/MRPublisher.java @@ -0,0 +1,93 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import java.io.IOException; +import java.util.Collection; + +/** + * A MR publishing interface. + * + */ +public interface MRPublisher extends MRClient +{ + /** + * A simple message container + */ + public static class message + { + public message ( String partition, String msg ) + { + fPartition = partition == null ? "" : partition; + fMsg = msg; + if ( fMsg == null ) + { + throw new IllegalArgumentException ( "Can't send a null message." ); + } + } + + public message ( message msg ) + { + this ( msg.fPartition, msg.fMsg ); + } + + public final String fPartition; + public final String fMsg; + } + + /** + * Send the given message without partition. partition will be placed at HTTP request level. + * @param msg message to sent + * @return the number of pending messages + * @throws IOException exception + */ + int send ( String msg ) throws IOException; + /** + * Send the given message using the given partition. + * @param partition partition + * @param msg message + * @return the number of pending messages + * @throws IOException exception + */ + int send ( String partition, String msg ) throws IOException; + + /** + * Send the given message using its partition. + * @param msg mesg + * @return the number of pending messages + * @throws IOException exp + */ + int send ( message msg ) throws IOException; + + /** + * Send the given messages using their partitions. + * @param msgs msg + * @return the number of pending messages + * @throws IOException exp + */ + int send ( Collection<message> msgs ) throws IOException; + + /** + * Close this publisher. It's an error to call send() after close() + */ + void close (); +} diff --git a/src/main/java/com/att/nsa/mr/client/MRTopicManager.java b/src/main/java/com/att/nsa/mr/client/MRTopicManager.java new file mode 100644 index 0000000..13524bd --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/MRTopicManager.java @@ -0,0 +1,183 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client; + +import java.io.IOException; +import java.util.Set; + +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.apiClient.http.HttpObjectNotFoundException; + + +/** + * A client for working with topic metadata. + * @author author + */ +public interface MRTopicManager extends MRClient +{ + /** + * Get the topics available in the cluster + * @return a set of topic names + * @throws IOException + */ + Set<String> getTopics () throws IOException; + + /** + * Information about a topic. + */ + public interface TopicInfo + { + /** + * Get the owner of the topic + * @return the owner, or null if no entry + */ + String getOwner (); + + /** + * Get the description for this topic + * @return the description, or null if no entry + */ + String getDescription (); + + /** + * Get the set of allowed producers (as API keys) on this topic + * @return the set of allowed producers, null of no ACL exists/enabled + */ + Set<String> getAllowedProducers (); + + /** + * Get the set of allowed consumers (as API keys) on this topic + * @return the set of allowed consumers, null of no ACL exists/enabled + */ + Set<String> getAllowedConsumers (); + } + + /** + * Get information about a topic. + * @param topic + * @return topic information + * @throws IOException + * @throws HttpObjectNotFoundException + */ + TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException; + + /** + * Create a new topic. + * @param topicName + * @param topicDescription + * @param partitionCount + * @param replicationCount + * @throws HttpException + * @throws IOException + */ + void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException; + + /** + * Delete the topic. This call must be authenticated and the API key listed as owner on the topic. + * NOTE: The MR (UEB) API server does not support topic deletion at this time (mid 2015) + * @param topic + * @throws HttpException + * @throws IOException + * @deprecated If/when the Kafka system supports topic delete, or the implementation changes, this will be restored. + */ + @Deprecated + void deleteTopic ( String topic ) throws HttpException, IOException; + + /** + * Can any client produce events into this topic without authentication? + * @param topic + * @return true if the topic is open for producing + * @throws IOException + * @throws HttpObjectNotFoundException + */ + boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException; + + /** + * Get the set of allowed producers. If the topic is open, the result is null. + * @param topic + * @return a set of allowed producers or null + * @throws IOException + * @throws HttpObjectNotFoundException + */ + Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException; + + /** + * Allow the given API key to produce messages on the given topic. The caller must + * own this topic. + * @param topic + * @param apiKey + * @throws HttpException + * @throws HttpObjectNotFoundException + * @throws IOException + */ + void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException; + + /** + * Revoke the given API key's authorization to produce messages on the given topic. + * The caller must own this topic. + * @param topic + * @param apiKey + * @throws HttpException + * @throws IOException + */ + void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException; + + /** + * Can any client consume events from this topic without authentication? + * @param topic + * @return true if the topic is open for consuming + * @throws IOException + * @throws HttpObjectNotFoundException + */ + boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException; + + /** + * Get the set of allowed consumers. If the topic is open, the result is null. + * @param topic + * @return a set of allowed consumers or null + * @throws IOException + * @throws HttpObjectNotFoundException + */ + Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException; + + /** + * Allow the given API key to consume messages on the given topic. The caller must + * own this topic. + * @param topic + * @param apiKey + * @throws HttpException + * @throws HttpObjectNotFoundException + * @throws IOException + */ + void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException; + + /** + * Revoke the given API key's authorization to consume messages on the given topic. + * The caller must own this topic. + * @param topic + * @param apiKey + * @throws HttpException + * @throws IOException + */ + void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException; +} + diff --git a/src/main/java/com/att/nsa/mr/client/impl/Clock.java b/src/main/java/com/att/nsa/mr/client/impl/Clock.java new file mode 100644 index 0000000..ace791e --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/impl/Clock.java @@ -0,0 +1,63 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.impl; + +public class Clock +{ + public synchronized static Clock getIt () + { + if ( sfClock == null ) + { + sfClock = new Clock (); + } + return sfClock; + } + + /** + * Get the system's current time in milliseconds. + * @return the current time + */ + public static long now () + { + return getIt().nowImpl (); + } + + /** + * Get current time in milliseconds + * @return current time in ms + */ + protected long nowImpl () + { + return System.currentTimeMillis (); + } + + protected Clock () + { + } + + private static Clock sfClock = null; + + protected synchronized static void register ( Clock testClock ) + { + sfClock = testClock; + } +} diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java new file mode 100644 index 0000000..a9fd9bd --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java @@ -0,0 +1,393 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.impl; + +import java.net.MalformedURLException; +import java.util.Collection; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + +import org.apache.http.HttpException; +import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; +import org.glassfish.jersey.internal.util.Base64; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.apiClient.http.CacheUse; +import com.att.nsa.apiClient.http.HttpClient; +import com.att.nsa.mr.client.MRClient; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; +//import com.fasterxml.jackson.core.JsonProcessingException; + +public class MRBaseClient extends HttpClient implements MRClient +{ + + private static final String MR_AUTH_CONSTANT = "X-CambriaAuth"; + private static final String MR_DATE_CONSTANT = "X-CambriaDate"; + + protected MRBaseClient ( Collection<String> hosts ) throws MalformedURLException + { + super ( ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort ); + } + + protected MRBaseClient ( Collection<String> hosts, int stdSvcPort ) throws MalformedURLException { + super ( ConnectionType.HTTP,hosts, stdSvcPort); + + fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + } + + protected MRBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException + { + super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000); + + fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + } + + + @Override + public void close () + { + } + + protected Set<String> jsonArrayToSet ( JSONArray a ) + { + if ( a == null ) return null; + + final TreeSet<String> set = new TreeSet<String> (); + for ( int i=0; i<a.length (); i++ ) + { + set.add ( a.getString ( i )); + } + return set; + } + + public void logTo ( Logger log ) + { + fLog = log; + replaceLogger ( log ); + } + + protected Logger getLog () + { + return fLog; + } + + private Logger fLog; + + public JSONObject post(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ + if ((null != username && null != password)) { + WebTarget target = null; + + Response response = null; + + target = getTarget(path, username, password); + String encoding = Base64.encodeAsString(username+":"+password); + + + response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType)); + + return getResponseDataInJson(response); + } else { + throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + } + } + public String postWithResponse(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ + String responseData = null; + if ((null != username && null != password)) { + WebTarget target = null; + + Response response = null; + + target = getTarget(path, username, password); + String encoding = Base64.encodeAsString(username+":"+password); + + + response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType)); + + responseData = response.readEntity(String.class); + return responseData; + } else { + throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + } + } + public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ + if ((null != username && null != password)) { + WebTarget target = null; + + Response response = null; + target= getTarget(path,username, password); + response = target.request() + .header(MR_AUTH_CONSTANT, authKey) + .header(MR_DATE_CONSTANT, authDate) + .post(Entity.entity(data, contentType)); + + return getResponseDataInJson(response); + } else { + throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + } + } + public String postAuthwithResponse(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ + String responseData = null; + if ((null != username && null != password)) { + WebTarget target = null; + + Response response = null; + target= getTarget(path,username, password); + response = target.request() + .header(MR_AUTH_CONSTANT, authKey) + .header(MR_DATE_CONSTANT, authDate) + .post(Entity.entity(data, contentType)); + responseData = response.readEntity(String.class); + return responseData; + + } else { + throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + } + } + + + public JSONObject get(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException { + if (null != username && null != password) { + + WebTarget target = null; + + Response response = null; + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + target=getTarget(path); + response = target.request() + .header(MR_AUTH_CONSTANT, username) + .header(MR_DATE_CONSTANT, password) + .get(); + } else { + target = getTarget(path, username, password); + String encoding = Base64.encodeAsString(username+":"+password); + + response = target.request().header("Authorization", "Basic " + encoding).get(); + + } + return getResponseDataInJson(response); + } else { + throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + } + } + + + public String getResponse(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException { + String responseData = null; + if (null != username && null != password) { + + WebTarget target = null; + + Response response = null; + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + target=getTarget(path); + response = target.request() + .header(MR_AUTH_CONSTANT, username) + .header(MR_DATE_CONSTANT, password) + .get(); + } else { + target = getTarget(path, username, password); + String encoding = Base64.encodeAsString(username+":"+password); + response = target.request().header("Authorization", "Basic " + encoding).get(); + } + MRClientFactory.HTTPHeadersMap=response.getHeaders(); + + String transactionid=response.getHeaderString("transactionid"); + if (transactionid!=null && !transactionid.equalsIgnoreCase("")) { + fLog.info("TransactionId : " + transactionid); + } + + responseData = response.readEntity(String.class); + return responseData; + } else { + throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + } + } + + public JSONObject getAuth(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException { + if (null != username && null != password) { + + WebTarget target = null; + + Response response = null; + target=getTarget(path, username, password); + response = target.request() + .header(MR_AUTH_CONSTANT, authKey) + .header(MR_DATE_CONSTANT, authDate) + .get(); + + return getResponseDataInJson(response); + } else { + throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + } + } + + public String getAuthResponse(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException { + String responseData = null; + if (null != username && null != password) { + + WebTarget target = null; + + Response response = null; + target=getTarget(path, username, password); + response = target.request() + .header(MR_AUTH_CONSTANT, authKey) + .header(MR_DATE_CONSTANT, authDate) + .get(); + + MRClientFactory.HTTPHeadersMap=response.getHeaders(); + + String transactionid=response.getHeaderString("transactionid"); + if (transactionid!=null && !transactionid.equalsIgnoreCase("")) { + fLog.info("TransactionId : " + transactionid); + } + + responseData = response.readEntity(String.class); + return responseData; + } else { + throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + } + } + + private WebTarget getTarget(final String path, final String username, final String password) { + + Client client = ClientBuilder.newClient(); + + + // Using UNIVERSAL as it supports both BASIC and DIGEST authentication types. + HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password); + client.register(feature); + + return client.target(path); + } + + + private WebTarget getTarget(final String path) { + + Client client = ClientBuilder.newClient(); + return client.target(path); + } + private JSONObject getResponseDataInJson(Response response) throws JSONException { + try { + MRClientFactory.HTTPHeadersMap=response.getHeaders(); + // fLog.info("DMAAP response status: " + response.getStatus()); + + + //MultivaluedMap<String, Object> headersMap = response.getHeaders(); + //for(String key : headersMap.keySet()) { + String transactionid=response.getHeaderString("transactionid"); + if (transactionid!=null && !transactionid.equalsIgnoreCase("")) { + fLog.info("TransactionId : " + transactionid); + } + + /*final String responseData = response.readEntity(String.class); + JSONTokener jsonTokener = new JSONTokener(responseData); + JSONObject jsonObject = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + JSONArray jsonArray = new JSONArray(jsonTokener); + jsonObject = new JSONObject(); + jsonObject.put("result", jsonArray); + } else { + jsonObject = new JSONObject(jsonTokener); + } + + return jsonObject;*/ + + + if(response.getStatus()==403) { + JSONObject jsonObject = null; + jsonObject = new JSONObject(); + JSONArray jsonArray = new JSONArray(); + jsonArray.put(response.getEntity()); + jsonObject.put("result", jsonArray); + jsonObject.put("status", response.getStatus()); + return jsonObject; + } + String responseData = response.readEntity(String.class); + + JSONTokener jsonTokener = new JSONTokener(responseData); + JSONObject jsonObject = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + JSONArray jsonArray = new JSONArray(jsonTokener); + jsonObject = new JSONObject(); + jsonObject.put("result", jsonArray); + jsonObject.put("status", response.getStatus()); + } else { + jsonObject = new JSONObject(jsonTokener); + jsonObject.put("status", response.getStatus()); + } + + return jsonObject; + } catch (JSONException excp) { + fLog.error("DMAAP - Error reading response data.", excp); + return null; + } + + } + + public String getHTTPErrorResponseMessage(String responseString){ + + String response = null; + int beginIndex = 0; + int endIndex = 0; + if(responseString.contains("<body>")){ + + beginIndex = responseString.indexOf("body>")+5; + endIndex = responseString.indexOf("</body"); + response = responseString.substring(beginIndex,endIndex); + } + + return response; + + } + + public String getHTTPErrorResponseCode(String responseString){ + + String response = null; + int beginIndex = 0; + int endIndex = 0; + if(responseString.contains("<title>")){ + beginIndex = responseString.indexOf("title>")+6; + endIndex = responseString.indexOf("</title"); + response = responseString.substring(beginIndex,endIndex); + } + + return response; + } + +} diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRBatchPublisher.java new file mode 100644 index 0000000..597e4da --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/impl/MRBatchPublisher.java @@ -0,0 +1,485 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.zip.GZIPOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.apiClient.http.HttpClient; +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.response.MRPublisherResponse; + +/** + * This is a batching publisher class that allows the client to publish messages + * in batches that are limited in terms of size and/or hold time. + * + * @author author + * @deprecated This class's tricky locking doesn't quite work + * + */ +@Deprecated +public class MRBatchPublisher implements MRBatchingPublisher +{ + public static final long kMinMaxAgeMs = 1; + + /** + * Create a batch publisher. + * + * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path. + * @param topic the topic to publish to + * @param maxBatchSize the maximum size of a batch + * @param maxAgeMs the maximum age of a batch + */ + public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) + { + if ( maxAgeMs < kMinMaxAgeMs ) + { + fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs ); + maxAgeMs = kMinMaxAgeMs; + } + + try { + fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress ); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + + // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for + // the oldest msg to hit max age? (locking is complicated, but should be do-able) + fExec = new ScheduledThreadPoolExecutor ( 1 ); + fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS ); + } + + @Override + public void setApiCredentials ( String apiKey, String apiSecret ) + { + fSender.setApiCredentials ( apiKey, apiSecret ); + } + + @Override + public void clearApiCredentials () + { + fSender.clearApiCredentials (); + } + + /** + * Send the given message with the given partition + * @param partition + * @param msg + * @throws IOException + */ + @Override + public int send ( String partition, String msg ) throws IOException + { + return send ( new message ( partition, msg ) ); + } + @Override + public int send ( String msg ) throws IOException + { + return send ( new message ( "",msg ) ); + } + /** + * Send the given message + * @param userMsg a message + * @throws IOException + */ + @Override + public int send ( message userMsg ) throws IOException + { + final LinkedList<message> list = new LinkedList<message> (); + list.add ( userMsg ); + return send ( list ); + } + + /** + * Send the given set of messages + * @param msgs the set of messages, sent in order of iteration + * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing) + * @throws IOException + */ + @Override + public int send ( Collection<message> msgs ) throws IOException + { + if ( msgs.size() > 0 ) + { + fSender.queue ( msgs ); + } + return fSender.size (); + } + + @Override + public int getPendingMessageCount () + { + return fSender.size (); + } + + /** + * Send any pending messages and close this publisher. + * @throws IOException + * @throws InterruptedException + */ + @Override + public void close () + { + try + { + final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS ); + if ( remains.size() > 0 ) + { + fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. " + + "(Consider using the alternate close method to capture unsent messages in this case.)" ); + } + } + catch ( InterruptedException e ) + { + fLog.warn ( "Possible message loss. " + e.getMessage(), e ); + } + catch ( IOException e ) + { + fLog.warn ( "Possible message loss. " + e.getMessage(), e ); + } + } + + public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException + { + fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false ); + fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false ); + fExec.shutdown (); + + final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit ); + final long timeoutAtMs = System.currentTimeMillis () + waitInMs; + while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 ) + { + fSender.checkSend ( true ); + Thread.sleep ( 250 ); + } + + final LinkedList<message> result = new LinkedList<message> (); + fSender.drainTo ( result ); + return result; + } + + private final ScheduledThreadPoolExecutor fExec; + private final Sender fSender; + + private static class TimestampedMessage extends message + { + public TimestampedMessage ( message m ) + { + super ( m ); + timestamp = System.currentTimeMillis (); + } + public final long timestamp; + } + + private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class ); + + private class Sender extends MRBaseClient implements Runnable + { + public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException + { + super ( baseUrls ); + + fNextBatch = new LinkedList<TimestampedMessage> (); + fSendingBatch = null; + fTopic = topic; + fMaxBatchSize = maxBatch; + fMaxAgeMs = maxAgeMs; + fCompress = compress; + fLock = new ReentrantReadWriteLock (); + fWriteLock = fLock.writeLock (); + fReadLock = fLock.readLock (); + fDontSendUntilMs = 0; + } + + public void drainTo ( LinkedList<message> list ) + { + fWriteLock.lock (); + try + { + if ( fSendingBatch != null ) + { + list.addAll ( fSendingBatch ); + } + list.addAll ( fNextBatch ); + + fSendingBatch = null; + fNextBatch.clear (); + } + finally + { + fWriteLock.unlock (); + } + } + + /** + * Called periodically by the background executor. + */ + @Override + public void run () + { + try + { + checkSend ( false ); + } + catch ( IOException e ) + { + fLog.warn ( "MR background send: " + e.getMessage () ); + } + } + + public int size () + { + fReadLock.lock (); + try + { + return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () ); + } + finally + { + fReadLock.unlock (); + } + } + + /** + * Called to queue a message. + * @param m + * @throws IOException + */ + public void queue ( Collection<message> msgs ) throws IOException + { + fWriteLock.lock (); + try + { + for ( message userMsg : msgs ) + { + if ( userMsg != null ) + { + fNextBatch.add ( new TimestampedMessage ( userMsg ) ); + } + else + { + fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." ); + } + } + } + finally + { + fWriteLock.unlock(); + } + checkSend ( false ); + } + + /** + * Send a batch if the queue is long enough, or the first pending message is old enough. + * @param force + * @throws IOException + */ + public void checkSend ( boolean force ) throws IOException + { + // hold a read lock just long enough to evaluate whether a batch + // should be sent + boolean shouldSend = false; + fReadLock.lock (); + try + { + if ( fNextBatch.size () > 0 ) + { + final long nowMs = System.currentTimeMillis (); + shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize ); + if ( !shouldSend ) + { + final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs; + shouldSend = sendAtMs <= nowMs; + } + + // however, unless forced, wait after an error + shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs ); + } + // else: even in 'force', there's nothing to send, so shouldSend=false is fine + } + finally + { + fReadLock.unlock (); + } + + // if a send is required, acquire a write lock, swap out the next batch, + // swap in a fresh batch, and release the lock for the caller to start + // filling a batch again. After releasing the lock, send the current + // batch. (There could be more messages added between read unlock and + // write lock, but that's fine.) + if ( shouldSend ) + { + fSendingBatch = null; + + fWriteLock.lock (); + try + { + fSendingBatch = fNextBatch; + fNextBatch = new LinkedList<TimestampedMessage> (); + } + finally + { + fWriteLock.unlock (); + } + + if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) ) + { + fLog.warn ( "Send failed, rebuilding send queue." ); + + // note the time for back-off + fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis (); + + // the send failed. reconstruct the pending queue + fWriteLock.lock (); + try + { + final LinkedList<TimestampedMessage> nextGroup = fNextBatch; + fNextBatch = fSendingBatch; + fNextBatch.addAll ( nextGroup ); + fSendingBatch = null; + fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." ); + } + finally + { + fWriteLock.unlock (); + } + } + else + { + fWriteLock.lock (); + try + { + fSendingBatch = null; + } + finally + { + fWriteLock.unlock (); + } + } + } + } + + private LinkedList<TimestampedMessage> fNextBatch; + private LinkedList<TimestampedMessage> fSendingBatch; + private final String fTopic; + private final int fMaxBatchSize; + private final long fMaxAgeMs; + private final boolean fCompress; + private final ReentrantReadWriteLock fLock; + private final WriteLock fWriteLock; + private final ReadLock fReadLock; + private long fDontSendUntilMs; + private static final long sfWaitAfterError = 1000; + } + + // this is static so that it's clearly not using any mutable member data outside of a lock + private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log ) + { + // it's possible for this call to be made with an empty list. in this case, just return. + if ( toSend.size() < 1 ) + { + return true; + } + + final long nowMs = System.currentTimeMillis (); + final String url = MRConstants.makeUrl ( topic ); + + log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms" ); + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); + try + { + OutputStream os = baseStream; + if ( compress ) + { + os = new GZIPOutputStream ( baseStream ); + } + for ( TimestampedMessage m : toSend ) + { + os.write ( ( "" + m.fPartition.length () ).getBytes() ); + os.write ( '.' ); + os.write ( ( "" + m.fMsg.length () ).getBytes() ); + os.write ( '.' ); + os.write ( m.fPartition.getBytes() ); + os.write ( m.fMsg.getBytes() ); + os.write ( '\n' ); + } + os.close (); + } + catch ( IOException e ) + { + log.warn ( "Problem writing stream to post: " + e.getMessage () ); + return false; + } + + boolean result = false; + final long startMs = System.currentTimeMillis (); + try + { + client.post ( url, compress ? + MRFormat.CAMBRIA_ZIP.toString () : + MRFormat.CAMBRIA.toString (), + baseStream.toByteArray(), false ); + result = true; + } + catch ( HttpException e ) + { + log.warn ( "Problem posting to MR: " + e.getMessage() ); + } + catch ( IOException e ) + { + log.warn ( "Problem posting to MR: " + e.getMessage() ); + } + + log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" ); + return result; + } + + @Override + public void logTo ( Logger log ) + { + fLog = log; + } + + @Override + public MRPublisherResponse sendBatchWithResponse() { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRClientVersionInfo.java b/src/main/java/com/att/nsa/mr/client/impl/MRClientVersionInfo.java new file mode 100644 index 0000000..fa9207d --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/impl/MRClientVersionInfo.java @@ -0,0 +1,54 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class MRClientVersionInfo +{ + public static String getVersion () + { + return version; + } + + private static final Properties props = new Properties(); + private static final String version; + static + { + String use = null; + try + { + final InputStream is = MRClientVersionInfo.class.getResourceAsStream ( "/MRClientVersion.properties" ); + if ( is != null ) + { + props.load ( is ); + use = props.getProperty ( "MRClientVersion", null ); + } + } + catch ( IOException e ) + { + } + version = use; + } +} diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConstants.java b/src/main/java/com/att/nsa/mr/client/impl/MRConstants.java new file mode 100644 index 0000000..803670c --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/impl/MRConstants.java @@ -0,0 +1,180 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.impl; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.http.HttpHost; + +class MRConstants +{ + private static final String PROTOCOL = "http"; + public static final String context = "/"; + public static final String kBasePath = "events/"; + //public static final int kStdMRServicePort = 3904; + public static final int kStdMRServicePort = 8080; + + public static String escape ( String s ) + { + try + { + return URLEncoder.encode ( s, "UTF-8"); + } + catch ( UnsupportedEncodingException e ) + { + throw new RuntimeException ( e ); + } + } + + public static String makeUrl ( String rawTopic ) + { + final String cleanTopic = escape ( rawTopic ); + + final StringBuffer url = new StringBuffer(). + append ( MRConstants.context ). + append ( MRConstants.kBasePath ). + append ( cleanTopic ); + return url.toString (); + } + + public static String makeUrl ( final String host, final String rawTopic ) + { + final String cleanTopic = escape ( rawTopic ); + + final StringBuffer url = new StringBuffer(); + + if (!host.startsWith("http") || !host.startsWith("https") ) { + url.append( PROTOCOL + "://" ); + } + url.append(host); + url.append ( MRConstants.context ); + url.append ( MRConstants.kBasePath ); + url.append ( cleanTopic ); + return url.toString (); + } + + public static String makeUrl ( final String host, final String rawTopic, final String transferprotocol,final String parttion ) + { + final String cleanTopic = escape ( rawTopic ); + + final StringBuffer url = new StringBuffer(); + + if (transferprotocol !=null && !transferprotocol.equals("")) { + url.append( transferprotocol + "://" ); + }else{ + url.append( PROTOCOL + "://" ); + } + url.append(host); + url.append ( MRConstants.context ); + url.append ( MRConstants.kBasePath ); + url.append ( cleanTopic ); + if(parttion!=null && !parttion.equalsIgnoreCase("")) + url.append("?partitionKey=").append(parttion); + return url.toString (); + } + public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId ) + { + final String cleanConsumerGroup = escape ( rawConsumerGroup ); + final String cleanConsumerId = escape ( rawConsumerId ); + return MRConstants.context + MRConstants.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId; + } + + /** + * Create a list of HttpHosts from an input list of strings. Input strings have + * host[:port] as format. If the port section is not provided, the default port is used. + * + * @param hosts + * @return a list of hosts + */ + public static List<HttpHost> createHostsList(Collection<String> hosts) + { + final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> (); + for ( String host : hosts ) + { + if ( host.length () == 0 ) continue; + convertedHosts.add ( hostForString ( host ) ); + } + return convertedHosts; + } + + /** + * Return an HttpHost from an input string. Input string has + * host[:port] as format. If the port section is not provided, the default port is used. + * + * @param hosts + * @return a list of hosts + */ + public static HttpHost hostForString ( String host ) + { + if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." ); + + String hostPart = host; + int port = kStdMRServicePort; + + final int colon = host.indexOf ( ':' ); + if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." ); + if ( colon > 0 ) + { + hostPart = host.substring ( 0, colon ).trim(); + + final String portPart = host.substring ( colon + 1 ).trim(); + if ( portPart.length () > 0 ) + { + try + { + port = Integer.parseInt ( portPart ); + } + catch ( NumberFormatException x ) + { + throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x ); + } + } + // else: use default port on "foo:" + } + + return new HttpHost ( hostPart, port ); + } + + public static String makeConsumerUrl(String host, String fTopic, String fGroup, String fId,final String transferprotocol) { + final String cleanConsumerGroup = escape ( fGroup ); + final String cleanConsumerId = escape ( fId ); + + StringBuffer url = new StringBuffer(); + + if (transferprotocol !=null && !transferprotocol.equals("")) { + url.append( transferprotocol + "://" ); + }else{ + url.append( PROTOCOL + "://" ); + } + + url.append(host); + url.append(context); + url.append(kBasePath); + url.append(fTopic + "/" + cleanConsumerGroup + "/" + cleanConsumerId); + + return url.toString(); + } +} diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java new file mode 100644 index 0000000..fdd1b89 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java @@ -0,0 +1,709 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.impl; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import org.apache.http.HttpException; +import org.apache.http.HttpStatus; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.aft.dme2.api.DME2Client; +import com.att.aft.dme2.api.DME2Exception; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.response.MRConsumerResponse; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +public class MRConsumerImpl extends MRBaseClient implements MRConsumer +{ + + private static final String SUCCESS_MESSAGE = "Success"; + + + private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + public static List<String> stringToList ( String str ) + { + final LinkedList<String> set = new LinkedList<String> (); + if ( str != null ) + { + final String[] parts = str.trim ().split ( "," ); + for ( String part : parts ) + { + final String trimmed = part.trim(); + if ( trimmed.length () > 0 ) + { + set.add ( trimmed ); + } + } + } + return set; + } + + public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, String apiSecret_password ) throws MalformedURLException + { + this( hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, false ); + } + + public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, boolean allowSelfSignedCerts ) throws MalformedURLException + { + super ( hostPart, topic + "::" + consumerGroup + "::" + consumerId ); + + fTopic = topic; + fGroup = consumerGroup; + fId = consumerId; + fTimeoutMs = timeoutMs; + fLimit = limit; + fFilter = filter; + + //setApiCredentials ( apiKey, apiSecret ); + } + + @Override + public Iterable<String> fetch () throws IOException,Exception + { + // fetch with the timeout and limit set in constructor + return fetch ( fTimeoutMs, fLimit ); + } + + @Override + public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException,Exception + { + final LinkedList<String> msgs = new LinkedList<String> (); + +// FIXME: the timeout on the socket needs to be at least as long as the long poll +// // sanity check for long poll timeout vs. socket read timeout +// final int maxReasonableTimeoutMs = CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10; +// if ( timeoutMs > maxReasonableTimeoutMs ) +// { +// log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. socket read timeout (" + +// CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll timeout to " + maxReasonableTimeoutMs + "." ); +// timeoutMs = maxReasonableTimeoutMs; +// } + + // final String urlPath = createUrlPath ( timeoutMs, limit ); + + //getLog().info ( "UEB GET " + urlPath ); + try + { + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + DMEConfigure(timeoutMs, limit); + try + { + //getLog().info ( "Receiving msgs from: " + url+subContextPath ); + String reply = sender.sendAndWait(timeoutMs+10000L); + // System.out.println("Message received = "+reply); + final JSONObject o =getResponseDataInJson(reply); + //msgs.add(reply); + if ( o != null ) + { + final JSONArray a = o.getJSONArray ( "result" ); + // final int b = o.getInt("status" ); + //if ( a != null && a.length()>0 ) + if ( a != null) + { + for ( int i=0; i<a.length (); i++ ) + { + //msgs.add("DMAAP response status: "+Integer.toString(b)); + if (a.get(i) instanceof String) + msgs.add ( a.getString(i) ); + else + msgs.add ( a.getJSONObject(i).toString() ); + + + } + } +// else if(a != null && a.length()<1){ +// msgs.add ("[]"); +// } + } + } + catch ( JSONException e ) + { + // unexpected response + reportProblemWithResponse (); + } + catch ( HttpException e ) + { + throw new IOException ( e ); + } + } + + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId,props.getProperty("Protocol")), timeoutMs, limit ); + + + try + { + final JSONObject o = get ( urlPath, username, password, protocolFlag ); + + if ( o != null ) + { + final JSONArray a = o.getJSONArray ( "result" ); + final int b = o.getInt("status" ); + //if ( a != null && a.length()>0 ) + if ( a != null) + { + for ( int i=0; i<a.length (); i++ ) + { + msgs.add("DMAAP response status: "+Integer.toString(b)); + if (a.get(i) instanceof String) + msgs.add ( a.getString(i) ); + else + msgs.add ( a.getJSONObject(i).toString() ); + + } + } +// else if(a != null && a.length()<1) +// { +// msgs.add ("[]"); +// } + } + } + catch ( JSONException e ) + { + // unexpected response + reportProblemWithResponse (); + } + catch ( HttpException e ) + { + throw new IOException ( e ); + } + } + + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ,props.getProperty("Protocol")), timeoutMs, limit ); + + + try + { + final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag ); + if ( o != null ) + { + final JSONArray a = o.getJSONArray ( "result" ); + final int b = o.getInt("status" ); + //if ( a != null && a.length()>0) + if ( a != null) + { + for ( int i=0; i<a.length (); i++ ) + { + msgs.add("DMAAP response status: "+Integer.toString(b)); + if (a.get(i) instanceof String) + msgs.add ( a.getString(i) ); + else + msgs.add ( a.getJSONObject(i).toString() ); + + } + } +// else if(a != null && a.length()<1){ +// msgs.add ("[]"); +// } + } + } + catch ( JSONException e ) + { + // unexpected response + reportProblemWithResponse (); + } + catch ( HttpException e ) + { + throw new IOException ( e ); + } + + } + + } catch ( JSONException e ) { + // unexpected response + reportProblemWithResponse (); + } catch (HttpException e) { + throw new IOException(e); + } catch (Exception e ) { + throw e; + } + + + return msgs; + } + + private JSONObject getResponseDataInJson(String response) { + try { + + + //fLog.info("DMAAP response status: " + response.getStatus()); + + // final String responseData = response.readEntity(String.class); + JSONTokener jsonTokener = new JSONTokener(response); + JSONObject jsonObject = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + JSONArray jsonArray = new JSONArray(jsonTokener); + jsonObject = new JSONObject(); + jsonObject.put("result", jsonArray); + } else { + jsonObject = new JSONObject(jsonTokener); + } + + return jsonObject; + } catch (JSONException excp) { + // fLog.error("DMAAP - Error reading response data.", excp); + return null; + } + + + +} + + private JSONObject getResponseDataInJsonWithResponseReturned(String response) { + JSONTokener jsonTokener = new JSONTokener(response); + JSONObject jsonObject = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if(null != response && response.length()==0){ + return null; + } + + if ('[' == firstChar) { + JSONArray jsonArray = new JSONArray(jsonTokener); + jsonObject = new JSONObject(); + jsonObject.put("result", jsonArray); + } else if('{' == firstChar){ + return null; + } else if('<' == firstChar){ + return null; + }else{ + jsonObject = new JSONObject(jsonTokener); + } + + return jsonObject; + + } + + private final String fTopic; + private final String fGroup; + private final String fId; + private final int fTimeoutMs; + private final int fLimit; + private String fFilter; + private String username; + private String password; + private String host; + private String latitude; + private String longitude; + private String version; + private String serviceName; + private String env; + private String partner; + private String routeOffer; + private String subContextPath; + private String protocol; + private String methodType; + private String url; + private String dmeuser; + private String dmepassword; + private String contenttype; + private DME2Client sender; + public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); + public String consumerFilePath; + private String authKey; + private String authDate; + private Properties props; + private HashMap<String, String> DMETimeOuts; + private String handlers; + public static String routerFilePath; + public static String getRouterFilePath() { + return routerFilePath; + } + + public static void setRouterFilePath(String routerFilePath) { + MRSimplerBatchPublisher.routerFilePath = routerFilePath; + } + public String getConsumerFilePath() { + return consumerFilePath; + } + + public void setConsumerFilePath(String consumerFilePath) { + this.consumerFilePath = consumerFilePath; + } + + public String getProtocolFlag() { + return protocolFlag; + } + + public void setProtocolFlag(String protocolFlag) { + this.protocolFlag = protocolFlag; + } + + private void DMEConfigure(int timeoutMs, int limit)throws IOException,DME2Exception, URISyntaxException{ + latitude = props.getProperty("Latitude"); + longitude = props.getProperty("Longitude"); + version = props.getProperty("Version"); + serviceName = props.getProperty("ServiceName"); + env = props.getProperty("Environment"); + partner = props.getProperty("Partner"); + routeOffer = props.getProperty("routeOffer"); + + subContextPath=props.getProperty("SubContextPath")+fTopic+"/"+fGroup+"/"+fId; + // subContextPath=createUrlPath (subContextPath, timeoutMs, limit); + //if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, timeoutMs); + + protocol = props.getProperty("Protocol"); + methodType = props.getProperty("MethodType"); + dmeuser = props.getProperty("username"); + dmepassword = props.getProperty("password"); + contenttype = props.getProperty("contenttype"); + handlers = props.getProperty("sessionstickinessrequired"); + //url =protocol+"://DME2SEARCH/"+ "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner; + // url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner; + + /** + * Changes to DME2Client url to use Partner for auto failover between data centers + * When Partner value is not provided use the routeOffer value for auto failover within a cluster + */ + + String preferredRouteKey = readRoute("preferredRouteKey"); + + if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) + { + url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner+"&routeoffer="+preferredRouteKey; + }else if (partner != null && !partner.isEmpty()) + { + url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; + } + else if (routeOffer!=null && !routeOffer.isEmpty()) + { + url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer; + } + + //fLog.info("url :"+url); + + if(timeoutMs != -1 )url=url+"&timeout="+timeoutMs; + if(limit != -1 )url=url+"&limit="+limit; + + DMETimeOuts = new HashMap<String, String>(); + DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + DMETimeOuts.put("Content-Type", contenttype); + System.setProperty("AFT_LATITUDE", latitude); + System.setProperty("AFT_LONGITUDE", longitude); + System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT")); + // System.setProperty("DME2.DEBUG", "true"); + + //SSL changes + System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", + "SSLv3,TLSv1,TLSv1.1"); + System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); + System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); + //SSL changes + + sender = new DME2Client(new URI(url), timeoutMs+10000L); + sender.setAllowAllHttpReturnCodes(true); + sender.setMethod(methodType); + sender.setSubContext(subContextPath); + if(dmeuser != null && dmepassword != null){ + sender.setCredentials(dmeuser, dmepassword); + //System.out.println(dmepassword); + } + sender.setHeaders(DMETimeOuts); + sender.setPayload(""); + + if(handlers.equalsIgnoreCase("yes")){ + sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); + sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); + }else{ + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); + } + /* HeaderReplyHandler headerhandler= new HeaderReplyHandler(); + sender.setReplyHandler(headerhandler);*/ +// } catch (DME2Exception x) { +// getLog().warn(x.getMessage(), x); +// System.out.println("XXXXXXXXXXXX"+x); +// } catch (URISyntaxException x) { +// System.out.println(x); +// getLog().warn(x.getMessage(), x); +// } catch (Exception x) { +// System.out.println("XXXXXXXXXXXX"+x); +// getLog().warn(x.getMessage(), x); +// } + } + public Properties getProps() { + return props; + } + + public void setProps(Properties props) { + this.props = props; + } + + protected String createUrlPath (String url, int timeoutMs , int limit ) throws IOException + { + final StringBuffer contexturl= new StringBuffer(url); + // final StringBuffer url = new StringBuffer ( CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) ); + final StringBuffer adds = new StringBuffer (); + if ( timeoutMs > -1 ) adds.append ( "timeout=" ).append ( timeoutMs ); + if ( limit > -1 ) + { + if ( adds.length () > 0 ) + { + adds.append ( "&" ); + } + adds.append ( "limit=" ).append ( limit ); + } + if ( fFilter != null && fFilter.length () > 0 ) + { + try { + if ( adds.length () > 0 ) + { + adds.append ( "&" ); + } + adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage() + "....say whaaaat?!"); + } + } + if ( adds.length () > 0 ) + { + contexturl.append ( "?" ).append ( adds.toString () ); + } + + //sender.setSubContext(url.toString()); + return contexturl.toString (); + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getAuthKey() { + return authKey; + } + + public void setAuthKey(String authKey) { + this.authKey = authKey; + } + + public String getAuthDate() { + return authDate; + } + + public void setAuthDate(String authDate) { + this.authDate = authDate; + } + + public String getfFilter() { + return fFilter; + } + + public void setfFilter(String fFilter) { + this.fFilter = fFilter; + } + + private String readRoute(String routeKey) { + + try { + + MRClientFactory.prop.load(new FileReader(new File (MRClientFactory.routeFilePath))); + + } catch (Exception ex) { + fLog.error("Reply Router Error " + ex.toString() ); + } + String routeOffer = MRClientFactory.prop.getProperty(routeKey); + return routeOffer; + } + + @Override + public MRConsumerResponse fetchWithReturnConsumerResponse() { + + // fetch with the timeout and limit set in constructor + return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit); + } + + @Override + public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, + int limit) { + final LinkedList<String> msgs = new LinkedList<String>(); + MRConsumerResponse mrConsumerResponse = new MRConsumerResponse(); + try { + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase( + protocolFlag)) { + DMEConfigure(timeoutMs, limit); + + String reply = sender.sendAndWait(timeoutMs + 10000L); + + final JSONObject o = getResponseDataInJsonWithResponseReturned(reply); + + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + + } + } + + } + createMRConsumerResponse(reply, mrConsumerResponse); + } + + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase( + protocolFlag)) { + final String urlPath = createUrlPath( + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, + props.getProperty("Protocol")), timeoutMs, + limit); + + String response = getResponse(urlPath, username, password, + protocolFlag); + + final JSONObject o = getResponseDataInJsonWithResponseReturned(response); + + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + + } + } + + } + createMRConsumerResponse(response, mrConsumerResponse); + } + + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase( + protocolFlag)) { + final String urlPath = createUrlPath( + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, + props.getProperty("Protocol")), timeoutMs, + limit); + + String response = getAuthResponse(urlPath, authKey, authDate, + username, password, protocolFlag); + final JSONObject o = getResponseDataInJsonWithResponseReturned(response); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + + } + } + + } + createMRConsumerResponse(response, mrConsumerResponse); + } + + + + } catch (JSONException e) { + mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + mrConsumerResponse.setResponseMessage(e.getMessage()); + } catch (HttpException e) { + mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + mrConsumerResponse.setResponseMessage(e.getMessage()); + }catch(DME2Exception e){ + mrConsumerResponse.setResponseCode(e.getErrorCode()); + mrConsumerResponse.setResponseMessage(e.getErrorMessage()); + }catch (Exception e) { + mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + mrConsumerResponse.setResponseMessage(e.getMessage()); + } + mrConsumerResponse.setActualMessages(msgs); + return mrConsumerResponse; + } + + private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) { + + if(reply.startsWith("{")){ + JSONObject jObject = new JSONObject(reply); + String message = jObject.getString("message"); + int status = jObject.getInt("status"); + + mrConsumerResponse.setResponseCode(Integer.toString(status)); + + if(null != message){ + mrConsumerResponse.setResponseMessage(message); + } + }else if (reply.startsWith("<")){ + mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply)); + mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); + }else{ + mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); + mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE); + } + + } + + +} diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRFormat.java b/src/main/java/com/att/nsa/mr/client/impl/MRFormat.java new file mode 100644 index 0000000..09a317b --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/impl/MRFormat.java @@ -0,0 +1,49 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.impl; + +enum MRFormat +{ + /** + * Messages are sent using MR's message format. + */ + CAMBRIA + { + public String toString() { return "application/cambria"; } + }, + + /** + * Messages are sent using MR's message format with compression. + */ + CAMBRIA_ZIP + { + public String toString() { return "application/cambria-zip"; } + }, + + /** + * messages are sent as simple JSON objects. + */ + JSON + { + public String toString() { return "application/json"; } + } +} diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRMetaClient.java b/src/main/java/com/att/nsa/mr/client/impl/MRMetaClient.java new file mode 100644 index 0000000..d32a7ef --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/impl/MRMetaClient.java @@ -0,0 +1,260 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.impl; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.Collection; +import java.util.Set; +import java.util.TreeSet; + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import com.att.nsa.apiClient.credentials.ApiCredential; +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.apiClient.http.HttpObjectNotFoundException; +import com.att.nsa.mr.client.MRIdentityManager; +import com.att.nsa.mr.client.MRTopicManager; + +public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager +{ + public MRMetaClient ( Collection<String> baseUrls ) throws MalformedURLException + { + super ( baseUrls ); + } + + @Override + public Set<String> getTopics () throws IOException + { + final TreeSet<String> set = new TreeSet<String> (); + try + { + final JSONObject topicSet = get ( "/topics" ); + final JSONArray a = topicSet.getJSONArray ( "topics" ); + for ( int i=0; i<a.length (); i++ ) + { + set.add ( a.getString ( i ) ); + } + } + catch ( HttpObjectNotFoundException e ) + { + getLog().warn ( "No /topics endpoint on service." ); + } + catch ( JSONException e ) + { + getLog().warn ( "Bad /topics result from service." ); + } + catch ( HttpException e ) + { + throw new IOException ( e ); + } + return set; + } + + @Override + public TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException + { + try + { + final JSONObject topicData = get ( "/topics/" + MRConstants.escape ( topic ) ); + return new TopicInfo () + { + @Override + public String getOwner () + { + return topicData.optString ( "owner", null ); + } + + @Override + public String getDescription () + { + return topicData.optString ( "description", null ); + } + + @Override + public Set<String> getAllowedProducers () + { + final JSONObject acl = topicData.optJSONObject ( "writerAcl" ); + if ( acl != null && acl.optBoolean ( "enabled", true ) ) + { + return jsonArrayToSet ( acl.optJSONArray ( "users" ) ); + } + return null; + } + + @Override + public Set<String> getAllowedConsumers () + { + final JSONObject acl = topicData.optJSONObject ( "readerAcl" ); + if ( acl != null && acl.optBoolean ( "enabled", true ) ) + { + return jsonArrayToSet ( acl.optJSONArray ( "users" ) ); + } + return null; + } + }; + } + catch ( JSONException e ) + { + throw new IOException ( e ); + } + catch ( HttpException e ) + { + throw new IOException ( e ); + } + } + + @Override + public void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException + { + final JSONObject o = new JSONObject (); + o.put ( "topicName", topicName ); + o.put ( "topicDescription", topicDescription ); + o.put ( "partitionCount", partitionCount ); + o.put ( "replicationCount", replicationCount ); + post ( "/topics/create", o, false ); + } + + @Override + public void deleteTopic ( String topic ) throws HttpException, IOException + { + delete ( "/topics/" + MRConstants.escape ( topic ) ); + } + + @Override + public boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException + { + return null == getAllowedProducers ( topic ); + } + + @Override + public Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException + { + return getTopicMetadata ( topic ).getAllowedProducers (); + } + + @Override + public void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException + { + put ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() ); + } + + @Override + public void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException + { + delete ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) ); + } + + @Override + public boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException + { + return null == getAllowedConsumers ( topic ); + } + + @Override + public Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException + { + return getTopicMetadata ( topic ).getAllowedConsumers (); + } + + @Override + public void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException + { + put ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() ); + } + + @Override + public void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException + { + delete ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) ); + } + + @Override + public ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException + { + try + { + final JSONObject o = new JSONObject (); + o.put ( "email", email ); + o.put ( "description", description ); + final JSONObject reply = post ( "/apiKeys/create", o, true ); + return new ApiCredential ( reply.getString ( "key" ), reply.getString ( "secret" ) ); + } + catch ( JSONException e ) + { + // the response doesn't meet our expectation + throw new MRApiException ( "The API key response is incomplete.", e ); + } + } + + @Override + public ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException + { + final JSONObject keyEntry = get ( "/apiKeys/" + MRConstants.escape ( apiKey ) ); + if ( keyEntry == null ) + { + return null; + } + + return new ApiKey () + { + @Override + public String getEmail () + { + final JSONObject aux = keyEntry.optJSONObject ( "aux" ); + if ( aux != null ) + { + return aux.optString ( "email" ); + } + return null; + } + + @Override + public String getDescription () + { + final JSONObject aux = keyEntry.optJSONObject ( "aux" ); + if ( aux != null ) + { + return aux.optString ( "description" ); + } + return null; + } + }; + } + + @Override + public void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException + { + final JSONObject o = new JSONObject (); + if ( email != null ) o.put ( "email", email ); + if ( description != null ) o.put ( "description", description ); + patch ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ), o ); + } + + @Override + public void deleteCurrentApiKey () throws HttpException, IOException + { + delete ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ) ); + } +} diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java new file mode 100644 index 0000000..ad925b1 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java @@ -0,0 +1,939 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPOutputStream; + +import javax.ws.rs.core.MultivaluedMap; + +import org.apache.http.HttpException; +import org.apache.http.HttpStatus; +import org.json.JSONArray; +import org.json.JSONObject; + +import com.att.aft.dme2.api.DME2Client; +import com.att.aft.dme2.api.DME2Exception; +import com.att.nsa.mr.client.HostSelector; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.response.MRPublisherResponse; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher +{ + public static class Builder + { + public Builder () + { + } + + public Builder againstUrls ( Collection<String> baseUrls ) + { + fUrls = baseUrls; + return this; + } + + public Builder onTopic ( String topic ) + { + fTopic = topic; + return this; + } + + public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs ) + { + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + return this; + } + + public Builder compress ( boolean compress ) + { + fCompress = compress; + return this; + } + + public Builder httpThreadTime ( int threadOccuranceTime ) + { + this.threadOccuranceTime = threadOccuranceTime; + return this; + } + + public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts ) + { + fAllowSelfSignedCerts = allowSelfSignedCerts; + return this; + } + + public Builder withResponse ( boolean withResponse) + { + fWithResponse = withResponse; + return this; + } + public MRSimplerBatchPublisher build () + { + if(!fWithResponse) + { + try { + return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } else + { + try { + return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + } + + private Collection<String> fUrls; + private String fTopic; + private int fMaxBatchSize = 100; + private long fMaxBatchAgeMs = 1000; + private boolean fCompress = false; + private int threadOccuranceTime = 50; + private boolean fAllowSelfSignedCerts = false; + private boolean fWithResponse = false; + + }; + + @Override + public int send ( String partition, String msg ) + { + return send ( new message ( partition, msg ) ); + } + @Override + public int send ( String msg ) + { + return send ( new message ( null, msg ) ); + } + + + @Override + public int send ( message msg ) + { + final LinkedList<message> list = new LinkedList<message> (); + list.add ( msg ); + return send ( list ); + } + + + + @Override + public synchronized int send ( Collection<message> msgs ) + { + if ( fClosed ) + { + throw new IllegalStateException ( "The publisher was closed." ); + } + + for ( message userMsg : msgs ) + { + fPending.add ( new TimestampedMessage ( userMsg ) ); + } + return getPendingMessageCount (); + } + + @Override + public synchronized int getPendingMessageCount () + { + return fPending.size (); + } + + @Override + public void close () + { + try + { + final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS ); + if ( remains.size() > 0 ) + { + getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. " + + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." ); + } + } + catch ( InterruptedException e ) + { + getLog().warn ( "Possible message loss. " + e.getMessage(), e ); + } + catch ( IOException e ) + { + getLog().warn ( "Possible message loss. " + e.getMessage(), e ); + } + } + + @Override + public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException + { + synchronized ( this ) + { + fClosed = true; + + // stop the background sender + fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false ); + fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false ); + fExec.shutdown (); + } + + final long now = Clock.now (); + final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit ); + final long timeoutAtMs = now + waitInMs; + + while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 ) + { + send ( true ); + Thread.sleep ( 250 ); + } + + synchronized ( this ) + { + final LinkedList<message> result = new LinkedList<message> (); + fPending.drainTo ( result ); + return result; + } + } + + /** + * Possibly send a batch to the MR server. This is called by the background thread + * and the close() method + * + * @param force + */ + private synchronized void send ( boolean force ) + { + if ( force || shouldSendNow () ) + { + if ( !sendBatch () ) + { + getLog().warn ( "Send failed, " + fPending.size() + " message to send." ); + + // note the time for back-off + fDontSendUntilMs = sfWaitAfterError + Clock.now (); + } + } + } + + private synchronized boolean shouldSendNow () + { + boolean shouldSend = false; + if ( fPending.size () > 0 ) + { + final long nowMs = Clock.now (); + + shouldSend = ( fPending.size() >= fMaxBatchSize ); + if ( !shouldSend ) + { + final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; + shouldSend = sendAtMs <= nowMs; + } + + // however, wait after an error + shouldSend = shouldSend && nowMs >= fDontSendUntilMs; + } + return shouldSend; + } + + private synchronized boolean sendBatch () + { + // it's possible for this call to be made with an empty list. in this case, just return. + if ( fPending.size() < 1 ) + { + return true; + } + + final long nowMs = Clock.now (); + + host = this.fHostSelector.selectBaseHost(); + + final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") ); + + + try + { + /*final String contentType = + fCompress ? + MRFormat.CAMBRIA_ZIP.toString () : + MRFormat.CAMBRIA.toString () + ;*/ + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); + OutputStream os = baseStream; + final String contentType = props.getProperty("contenttype"); + if(contentType.equalsIgnoreCase("application/json")){ + JSONArray jsonArray = new JSONArray(); + for ( TimestampedMessage m : fPending ) + { + JSONObject jsonObject = new JSONObject(m.fMsg); + + jsonArray.put(jsonObject); + + } + os.write (jsonArray.toString().getBytes() ); + os.close(); + + }else if (contentType.equalsIgnoreCase("text/plain")){ + for ( TimestampedMessage m : fPending ) + { + os.write ( m.fMsg.getBytes() ); + os.write ( '\n' ); + } + os.close (); + } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){ + if ( contentType.equalsIgnoreCase("application/cambria-zip") ) + { + os = new GZIPOutputStream ( baseStream ); + } + for ( TimestampedMessage m : fPending ) + { + + os.write ( ( "" + m.fPartition.length () ).getBytes() ); + os.write ( '.' ); + os.write ( ( "" + m.fMsg.length () ).getBytes() ); + os.write ( '.' ); + os.write ( m.fPartition.getBytes() ); + os.write ( m.fMsg.getBytes() ); + os.write ( '\n' ); + } + os.close (); + }else{ + for ( TimestampedMessage m : fPending ) + { + os.write ( m.fMsg.getBytes() ); + + } + os.close (); + } + + + + final long startMs = Clock.now (); + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + + + DME2Configue(); + + Thread.sleep(5); + getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); + sender.setPayload(os.toString()); + String dmeResponse = sender.sendAndWait(5000L); + + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + + dmeResponse.toString(); + getLog().info(logLine); + fPending.clear(); + return true; + } + + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); + final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag); + //System.out.println(result.getInt("status")); + //Here we are checking for error response. If HTTP status + //code is not within the http success response code + //then we consider this as error and return false + if(result.getInt("status") < 200 || result.getInt("status") > 299) { + return false; + } + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); + getLog().info(logLine); + fPending.clear(); + return true; + } + + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); + final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); + + + //System.out.println(result.getInt("status")); + //Here we are checking for error response. If HTTP status + //code is not within the http success response code + //then we consider this as error and return false + if(result.getInt("status") < 200 || result.getInt("status") > 299) { + return false; + } + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); + getLog().info(logLine); + fPending.clear(); + return true; + } + } + catch ( IllegalArgumentException x ) { + getLog().warn ( x.getMessage(), x ); + } catch ( IOException x ) { + getLog().warn ( x.getMessage(), x ); + } catch (HttpException x) { + getLog().warn ( x.getMessage(), x ); + } catch (Exception x) { + getLog().warn(x.getMessage(), x); + } + return false; + } + + public synchronized MRPublisherResponse sendBatchWithResponse () + { + // it's possible for this call to be made with an empty list. in this case, just return. + if ( fPending.size() < 1 ) + { + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage("No Messages to send"); + return pubResponse; + } + + final long nowMs = Clock.now (); + + host = this.fHostSelector.selectBaseHost(); + + final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") ); + OutputStream os=null; + try + { + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); + os = baseStream; + final String contentType = props.getProperty("contenttype"); + if(contentType.equalsIgnoreCase("application/json")){ + JSONArray jsonArray = new JSONArray(); + for ( TimestampedMessage m : fPending ) + { + JSONObject jsonObject = new JSONObject(m.fMsg); + + jsonArray.put(jsonObject); + + } + os.write (jsonArray.toString().getBytes() ); + }else if (contentType.equalsIgnoreCase("text/plain")){ + for ( TimestampedMessage m : fPending ) + { + os.write ( m.fMsg.getBytes() ); + os.write ( '\n' ); + } + } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){ + if ( contentType.equalsIgnoreCase("application/cambria-zip") ) + { + os = new GZIPOutputStream ( baseStream ); + } + for ( TimestampedMessage m : fPending ) + { + + os.write ( ( "" + m.fPartition.length () ).getBytes() ); + os.write ( '.' ); + os.write ( ( "" + m.fMsg.length () ).getBytes() ); + os.write ( '.' ); + os.write ( m.fPartition.getBytes() ); + os.write ( m.fMsg.getBytes() ); + os.write ( '\n' ); + } + os.close (); + }else{ + for ( TimestampedMessage m : fPending ) + { + os.write ( m.fMsg.getBytes() ); + + } + } + + + + final long startMs = Clock.now (); + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + + + try { + DME2Configue(); + + Thread.sleep(5); + getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); + sender.setPayload(os.toString()); + + + String dmeResponse = sender.sendAndWait(5000L); + System.out.println("dmeres->"+dmeResponse); + + + pubResponse = createMRPublisherResponse(dmeResponse,pubResponse); + + if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + final String logLine = String.valueOf((Clock.now() - startMs)) + + dmeResponse.toString(); + getLog().info(logLine); + fPending.clear(); + + } + catch (DME2Exception x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(x.getErrorCode()); + pubResponse.setResponseMessage(x.getErrorMessage()); + } catch (URISyntaxException x) { + + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage(x.getMessage()); + } catch (Exception x) { + + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage(x.getMessage()); + + } + + return pubResponse; + } + + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); + final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag); + //System.out.println(result.getInt("status")); + //Here we are checking for error response. If HTTP status + //code is not within the http success response code + //then we consider this as error and return false + + + pubResponse = createMRPublisherResponse(result,pubResponse); + + if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); + getLog().info(logLine); + fPending.clear(); + return pubResponse; + } + + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); + final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); + + //System.out.println(result.getInt("status")); + //Here we are checking for error response. If HTTP status + //code is not within the http success response code + //then we consider this as error and return false + pubResponse = createMRPublisherResponse(result,pubResponse); + + if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + + final String logLine = String.valueOf((Clock.now() - startMs)); + getLog().info(logLine); + fPending.clear(); + return pubResponse; + } + } + catch ( IllegalArgumentException x ) { + getLog().warn ( x.getMessage(), x ); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage(x.getMessage()); + + } catch ( IOException x ) { + getLog().warn ( x.getMessage(), x ); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage(x.getMessage()); + + } catch (HttpException x) { + getLog().warn ( x.getMessage(), x ); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage(x.getMessage()); + + } catch (Exception x) { + getLog().warn(x.getMessage(), x); + + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage(x.getMessage()); + + } + + finally { + if (fPending.size()>0) { + getLog().warn ( "Send failed, " + fPending.size() + " message to send." ); + pubResponse.setPendingMsgs(fPending.size()); + } + if (os != null) { + try { + os.close(); + } catch (Exception x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage("Error in closing Output Stream"); + } + } + } + + return pubResponse; + } + +private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) { + + if (reply.isEmpty()) + { + + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + mrPubResponse.setResponseMessage("Please verify the Producer properties"); + } + else if(reply.startsWith("{")) + { + JSONObject jObject = new JSONObject(reply); + if(jObject.has("message") && jObject.has("status")) + { + String message = jObject.getString("message"); + if(null != message) + { + mrPubResponse.setResponseMessage(message); + } + mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status"))); + } + else + { + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); + mrPubResponse.setResponseMessage(reply); + } + } + else if (reply.startsWith("<")) + { + String responseCode = getHTTPErrorResponseCode(reply); + if( responseCode.contains("403")) + { + responseCode = "403"; + } + mrPubResponse.setResponseCode(responseCode); + mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); + } + + return mrPubResponse; + } + + private final String fTopic; + private final int fMaxBatchSize; + private final long fMaxBatchAgeMs; + private final boolean fCompress; + private int threadOccuranceTime; + private boolean fClosed; + private String username; + private String password; + private String host; + + //host selector + private HostSelector fHostSelector = null; + + private final LinkedBlockingQueue<TimestampedMessage> fPending; + private long fDontSendUntilMs; + private final ScheduledThreadPoolExecutor fExec; + + private String latitude; + private String longitude; + private String version; + private String serviceName; + private String env; + private String partner; + private String routeOffer; + private String subContextPath; + private String protocol; + private String methodType; + private String url; + private String dmeuser; + private String dmepassword; + private String contentType; + private static final long sfWaitAfterError = 10000; + private HashMap<String, String> DMETimeOuts; + private DME2Client sender; + public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); + public String producerFilePath; + private String authKey; + private String authDate; + private String handlers; + private Properties props; + public static String routerFilePath; + public static Map<String, String> headers=new HashMap<String, String>(); + public static MultivaluedMap<String, Object> headersMap; + + + private MRPublisherResponse pubResponse; + + public MRPublisherResponse getPubResponse() { + return pubResponse; + } + public void setPubResponse(MRPublisherResponse pubResponse) { + this.pubResponse = pubResponse; + } + + public static String getRouterFilePath() { + return routerFilePath; + } + + public static void setRouterFilePath(String routerFilePath) { + MRSimplerBatchPublisher.routerFilePath = routerFilePath; + } + + public Properties getProps() { + return props; + } + + public void setProps(Properties props) { + this.props = props; + } + + public String getProducerFilePath() { + return producerFilePath; + } + + public void setProducerFilePath(String producerFilePath) { + this.producerFilePath = producerFilePath; + } + + public String getProtocolFlag() { + return protocolFlag; + } + + public void setProtocolFlag(String protocolFlag) { + this.protocolFlag = protocolFlag; + } + + + private void DME2Configue() throws Exception { + try { + + /* FileReader reader = new FileReader(new File (producerFilePath)); + Properties props = new Properties(); + props.load(reader);*/ + latitude = props.getProperty("Latitude"); + longitude = props.getProperty("Longitude"); + version = props.getProperty("Version"); + serviceName = props.getProperty("ServiceName"); + env = props.getProperty("Environment"); + partner = props.getProperty("Partner"); + routeOffer = props.getProperty("routeOffer"); + subContextPath = props.getProperty("SubContextPath")+fTopic; + /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){ + subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition"); + }*/ + protocol = props.getProperty("Protocol"); + methodType = props.getProperty("MethodType"); + dmeuser = props.getProperty("username"); + dmepassword = props.getProperty("password"); + contentType = props.getProperty("contenttype"); + handlers = props.getProperty("sessionstickinessrequired"); + routerFilePath= props.getProperty("DME2preferredRouterFilePath"); + + /** + * Changes to DME2Client url to use Partner for auto failover between data centers + * When Partner value is not provided use the routeOffer value for auto failover within a cluster + */ + + + String partitionKey = props.getProperty("partition"); + + if (partner != null && !partner.isEmpty() ) + { + url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; + if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){ + url = url + "&partitionKey=" + partitionKey; + } + } + else if (routeOffer!=null && !routeOffer.isEmpty()) + { + url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer; + if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){ + url = url + "&partitionKey=" + partitionKey; + } + } + + DMETimeOuts = new HashMap<String, String>(); + DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + DMETimeOuts.put("Content-Type", contentType); + System.setProperty("AFT_LATITUDE", latitude); + System.setProperty("AFT_LONGITUDE", longitude); + System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT")); + //System.setProperty("DME2.DEBUG", "true"); + // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true"); + //System.out.println("XXXXXX"+url); + + //SSL changes + System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", + "SSLv3,TLSv1,TLSv1.1"); + System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); + System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); + + //SSL changes + + sender = new DME2Client(new URI(url), 5000L); + + sender.setAllowAllHttpReturnCodes(true); + sender.setMethod(methodType); + sender.setSubContext(subContextPath); + sender.setCredentials(dmeuser, dmepassword); + sender.setHeaders(DMETimeOuts); + if(handlers.equalsIgnoreCase("yes")){ + sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); + sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); + }else{ + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); + } + } catch (DME2Exception x) { + getLog().warn(x.getMessage(), x); + throw new DME2Exception(x.getErrorCode(),x.getErrorMessage()); + } catch (URISyntaxException x) { + + getLog().warn(x.getMessage(), x); + throw new URISyntaxException(url,x.getMessage()); + } catch (Exception x) { + + getLog().warn(x.getMessage(), x); + throw new Exception(x.getMessage()); + } + } + + private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException + { + super ( hosts ); + + if ( topic == null || topic.length() < 1 ) + { + throw new IllegalArgumentException ( "A topic must be provided." ); + } + + fHostSelector = new HostSelector(hosts, null); + fClosed = false; + fTopic = topic; + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + fCompress = compress; + + fPending = new LinkedBlockingQueue<TimestampedMessage> (); + fDontSendUntilMs = 0; + fExec = new ScheduledThreadPoolExecutor ( 1 ); + pubResponse = new MRPublisherResponse(); + + } + + private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException + { + super ( hosts ); + + if ( topic == null || topic.length() < 1 ) + { + throw new IllegalArgumentException ( "A topic must be provided." ); + } + + fHostSelector = new HostSelector(hosts, null); + fClosed = false; + fTopic = topic; + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + fCompress = compress; + threadOccuranceTime=httpThreadOccurnace; + fPending = new LinkedBlockingQueue<TimestampedMessage> (); + fDontSendUntilMs = 0; + fExec = new ScheduledThreadPoolExecutor ( 1 ); + fExec.scheduleAtFixedRate ( new Runnable() + { + @Override + public void run () + { + send ( false ); + } + }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS ); + } + + private static class TimestampedMessage extends message + { + public TimestampedMessage ( message m ) + { + super ( m ); + timestamp = Clock.now(); + } + public final long timestamp; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getAuthKey() { + return authKey; + } + + public void setAuthKey(String authKey) { + this.authKey = authKey; + } + + public String getAuthDate() { + return authDate; + } + + public void setAuthDate(String authDate) { + this.authDate = authDate; + } + +} diff --git a/src/main/java/com/att/nsa/mr/client/response/MRConsumerResponse.java b/src/main/java/com/att/nsa/mr/client/response/MRConsumerResponse.java new file mode 100644 index 0000000..102794e --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/response/MRConsumerResponse.java @@ -0,0 +1,60 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.response; + +public class MRConsumerResponse { + + private String responseCode; + + private String responseMessage; + + private Iterable<String> actualMessages; + + + + + public String getResponseCode() { + return responseCode; + } + + public void setResponseCode(String responseCode) { + this.responseCode = responseCode; + } + + public String getResponseMessage() { + return responseMessage; + } + + public void setResponseMessage(String responseMessage) { + this.responseMessage = responseMessage; + } + + public Iterable<String> getActualMessages() { + return actualMessages; + } + + public void setActualMessages(Iterable<String> actualMessages) { + this.actualMessages = actualMessages; + } + + +} diff --git a/src/main/java/com/att/nsa/mr/client/response/MRPublisherResponse.java b/src/main/java/com/att/nsa/mr/client/response/MRPublisherResponse.java new file mode 100644 index 0000000..4aebe23 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/client/response/MRPublisherResponse.java @@ -0,0 +1,66 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.client.response; + +/** + * Response for Publisher + * @author author + * + */ +public class MRPublisherResponse { + private String responseCode; + + private String responseMessage; + + private int pendingMsgs; + + public String getResponseCode() { + return responseCode; + } + + public void setResponseCode(String responseCode) { + this.responseCode = responseCode; + } + + public String getResponseMessage() { + return responseMessage; + } + + public void setResponseMessage(String responseMessage) { + this.responseMessage = responseMessage; + } + + public int getPendingMsgs() { + return pendingMsgs; + } + + public void setPendingMsgs(int pendingMsgs) { + this.pendingMsgs = pendingMsgs; + } + + public String toString() { + return "Response Code:" + this.responseCode + "," + + "Response Message:" + this.responseMessage + "," + "Pending Messages Count" + + this.pendingMsgs; + } + +} diff --git a/src/main/java/com/att/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java b/src/main/java/com/att/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java new file mode 100644 index 0000000..1d3b6be --- /dev/null +++ b/src/main/java/com/att/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java @@ -0,0 +1,50 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.dme.client; + +//package com.att.aft.dme2.api; + +import java.util.logging.Level; +import java.util.logging.Logger; + +//import com.att.aft.dme2.api.DME2FailoverFaultHandler; +//import com.att.aft.dme2.api.util.DME2Constants; +//import com.att.aft.dme2.api.util.DME2ExchangeFaultContext; +//import com.att.aft.dme2.api.util.LogMessage; +//import com.att.aft.dme2.api.util.LogUtil; +public class DefaultLoggingFailoverFaultHandler /*implements DME2FailoverFaultHandler*/ { + /** The logger. */ + //private static Logger logger = DME2Constants.getLogger(DefaultLoggingFailoverFaultHandler.class.getName()); + +// @Override +// public void handleEndpointFailover(/*DME2ExchangeFaultContext context*/) { +// // LogUtil.INSTANCE.report(logger, Level.WARNING, LogMessage.SEP_FAILOVER, context.getService(),context.getRequestURL(),context.getRouteOffer(),context.getResponseCode(),context.getException()); +// } +// @Override +// /** +// * The DME2Exchange already has a log message when the route offer is failed over. We dont need to log it again here. +// */ +// public void handleRouteOfferFailover(DME2ExchangeFaultContext context) { +// //noop +// +// } +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/mr/dme/client/HeaderReplyHandler.java b/src/main/java/com/att/nsa/mr/dme/client/HeaderReplyHandler.java new file mode 100644 index 0000000..c70c63f --- /dev/null +++ b/src/main/java/com/att/nsa/mr/dme/client/HeaderReplyHandler.java @@ -0,0 +1,63 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.dme.client; + + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.aft.dme2.api.util.DME2ExchangeFaultContext; +import com.att.aft.dme2.api.util.DME2ExchangeReplyHandler; +import com.att.aft.dme2.api.util.DME2ExchangeResponseContext; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; + + + +//public class HeaderReplyHandler implements DME2ReplyHandler { + + public class HeaderReplyHandler implements DME2ExchangeReplyHandler { + + private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + + + @Override public void handleFault(DME2ExchangeFaultContext responseData) { + // TODO Auto-generated method stub + //StaticCache.getInstance().setHandleFaultInvoked(true); + } + @Override public void handleEndpointFault(DME2ExchangeFaultContext responseData) { + // TODO Auto-generated method stub + //StaticCache.getInstance().setHandleEndpointFaultInvoked(true); + } +@Override public void handleReply(DME2ExchangeResponseContext responseData) { + + if(responseData != null) { + MRClientFactory.DME2HeadersMap=responseData.getResponseHeaders(); + if (responseData.getResponseHeaders().get("transactionId")!=null) + fLog.info("Transaction Id : " + responseData.getResponseHeaders().get("transactionId")); + + } +} + +} diff --git a/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteReplyHandler.java b/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteReplyHandler.java new file mode 100644 index 0000000..b7e174c --- /dev/null +++ b/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteReplyHandler.java @@ -0,0 +1,74 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.dme.client; +import java.io.File; +import java.io.FileWriter; +import java.io.InputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.aft.dme2.api.util.DME2ExchangeFaultContext; +import com.att.aft.dme2.api.util.DME2ExchangeReplyHandler; +import com.att.aft.dme2.api.util.DME2ExchangeResponseContext; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; + +public class PreferredRouteReplyHandler implements DME2ExchangeReplyHandler { + private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + @Override public void handleReply(DME2ExchangeResponseContext responseData) { + + if(responseData != null) { + MRClientFactory.DME2HeadersMap=responseData.getResponseHeaders(); + if (responseData.getResponseHeaders().get("transactionId")!=null) + + fLog.info("Transaction_Id : " + responseData.getResponseHeaders().get("transactionId")); + + if(responseData.getRouteOffer() != null ){ + routeWriter("preferredRouteKey",responseData.getRouteOffer()); + + } + } +} + + @Override public void handleFault(DME2ExchangeFaultContext responseData) { + // TODO Auto-generated method stub + //StaticCache.getInstance().setHandleFaultInvoked(true); + } + @Override public void handleEndpointFault(DME2ExchangeFaultContext responseData) { + // TODO Auto-generated method stub + //StaticCache.getInstance().setHandleEndpointFaultInvoked(true); + } + public void routeWriter(String routeKey, String routeValue){ + + try{ + + FileWriter routeWriter=new FileWriter(new File (MRSimplerBatchPublisher.routerFilePath)); + routeWriter.write(routeKey+"="+routeValue); + routeWriter.close(); + + }catch(Exception ex){ + fLog.error("Reply Router Error " + ex.toString() ); + } + + } +} diff --git a/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteRequestHandler.java b/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteRequestHandler.java new file mode 100644 index 0000000..a65683b --- /dev/null +++ b/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteRequestHandler.java @@ -0,0 +1,54 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.dme.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.aft.dme2.api.util.DME2ExchangeRequestContext; +import com.att.aft.dme2.api.util.DME2ExchangeRequestHandler; +import com.att.nsa.mr.client.MRClientFactory; + +public class PreferredRouteRequestHandler implements DME2ExchangeRequestHandler { + private Logger fLog = LoggerFactory.getLogger(this.getClass().getName()); + + @Override + public void handleRequest(DME2ExchangeRequestContext requestData) { + + if (requestData != null) { + + requestData.setPreferredRouteOffer(readRoute("preferredRouteKey")); + } + } + + public String readRoute(String routeKey) { + + try { + + MRClientFactory.prop.load(MRClientFactory.routeReader); + + } catch (Exception ex) { + fLog.error("Request Router Error " + ex.toString()); + } + return MRClientFactory.prop.getProperty(routeKey); + } +} diff --git a/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java new file mode 100644 index 0000000..4df9fcb --- /dev/null +++ b/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java @@ -0,0 +1,77 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.dme.client; + +import java.util.Map; + +import javax.ws.rs.core.MultivaluedMap; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; + +public class SimpleExampleConsumer { + + public static void main(String[] args) { + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis(); + + try { + + final MRConsumer cc = MRClientFactory.createConsumer("D:\\SG\\consumer.properties"); + while (true) { + for (String msg : cc.fetch()) { + + System.out.println("Message Received: " + msg); + } + // Header for DME2 Call. + MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap; + for (String key : headersMap.keySet()) { + System.out.println("Header Key " + key); + System.out.println("Header Value " + headersMap.get(key)); + } + // Header for HTTP Call. + + Map<String, String> + dme2headersMap=MRClientFactory.DME2HeadersMap; for(String key + : dme2headersMap.keySet()) { System.out.println("Header Key " + + key); System.out.println("Header Value " + + dme2headersMap.get(key)); } + + if (count > nextReport) { + nextReport += 5000; + + final long endMs = System.currentTimeMillis(); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps"); + } + } + } catch (Exception x) { + System.err.println(x.getClass().getName() + ": " + x.getMessage()); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/dme/client/SimpleExamplePublisher.java b/src/main/java/com/att/nsa/mr/dme/client/SimpleExamplePublisher.java new file mode 100644 index 0000000..0e8efda --- /dev/null +++ b/src/main/java/com/att/nsa/mr/dme/client/SimpleExamplePublisher.java @@ -0,0 +1,135 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.dme.client; + + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.core.MultivaluedMap; + +import org.json.JSONObject; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + +/** + * An example of how to use the Java publisher. + * + * @author author + */ +public class SimpleExamplePublisher { + static String content = null; + static String messageSize = null; + static String transport = null; + static String messageCount = null; + + public void publishMessage(String producerFilePath) throws IOException, InterruptedException, Exception { + + // create our publisher + + // publish some messages + + + StringBuilder sb = new StringBuilder(); + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher(producerFilePath); + + if (content.equalsIgnoreCase("text/plain")) { + for (int i = 0; i < Integer.parseInt(messageCount); i++) { + for (int j = 0; j < Integer.parseInt(messageSize); j++) { + sb.append("T"); + } + + pub.send(sb.toString()); + } + } else if (content.equalsIgnoreCase("application/cambria")) { + for (int i = 0; i < Integer.parseInt(messageCount); i++) { + for (int j = 0; j < Integer.parseInt(messageSize); j++) { + sb.append("C"); + } + + pub.send("Key", sb.toString()); + } + } else if (content.equalsIgnoreCase("application/json")) { + for (int i = 0; i < Integer.parseInt(messageCount); i++) { + + final JSONObject msg12 = new JSONObject(); + msg12.put("Name", "DMaaP Reference Client to Test jason Message"); + + pub.send(msg12.toString()); + + } + } + + // ... + + // close the publisher to make sure everything's sent before exiting. + // The batching + // publisher interface allows the app to get the set of unsent messages. + // It could + // write them to disk, for example, to try to send them later. + /* final List<message> stuck = pub.close(20, TimeUnit.SECONDS); + if (stuck.size() > 0) { + System.err.println(stuck.size() + " messages unsent"); + } else { + System.out.println("Clean exit; all messages sent."); + }*/ + + if (transport.equalsIgnoreCase("HTTP")) { + MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap; + for (String key : headersMap.keySet()) { + System.out.println("Header Key " + key); + System.out.println("Header Value " + headersMap.get(key)); + } + } else { + Map<String, String> dme2headersMap = MRClientFactory.DME2HeadersMap; + for (String key : dme2headersMap.keySet()) { + System.out.println("Header Key " + key); + System.out.println("Header Value " + dme2headersMap.get(key)); + } + } + + } + + public static void main(String[] args) throws InterruptedException, Exception { + + String producerFilePath = args[0]; + content = args[1]; + messageSize = args[2]; + transport = args[3]; + messageCount = args[4]; + /*String producerFilePath = null; + content = null; + messageSize =null; + transport =null; + messageCount = null;*/ + SimpleExamplePublisher publisher = new SimpleExamplePublisher(); + + publisher.publishMessage("D:\\SG\\producer.properties"); + } + +} diff --git a/src/main/java/com/att/nsa/mr/logging/MRAppender.java b/src/main/java/com/att/nsa/mr/logging/MRAppender.java new file mode 100644 index 0000000..8faf0df --- /dev/null +++ b/src/main/java/com/att/nsa/mr/logging/MRAppender.java @@ -0,0 +1,160 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +/** + * + */ +package com.att.nsa.mr.logging; + +import java.io.IOException; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.helpers.LogLog; +import org.apache.log4j.spi.LoggingEvent; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher; + +/** + * @author author + * + */ +public class MRAppender extends AppenderSkeleton { + + private MRPublisher fPublisher; + + //Provided through log4j configuration + private String topic; + private String partition; + private String hosts; + private int maxBatchSize = 1; + private int maxAgeMs = 1000; + private boolean compress = false; + + /** + * + */ + public MRAppender() { + super(); + } + + /** + * @param isActive + */ + public MRAppender(boolean isActive) { + super(isActive); + } + + /* (non-Javadoc) + * @see org.apache.log4j.Appender#close() + */ + @Override + public void close() { + if (!this.closed) { + this.closed = true; + fPublisher.close(); + } + } + + /* (non-Javadoc) + * @see org.apache.log4j.Appender#requiresLayout() + */ + @Override + public boolean requiresLayout() { + return false; + } + + /* (non-Javadoc) + * @see org.apache.log4j.AppenderSkeleton#append(org.apache.log4j.spi.LoggingEvent) + */ + @Override + protected void append(LoggingEvent event) { + final String message; + + if (this.layout == null) { + message = event.getRenderedMessage(); + } else { + message = this.layout.format(event); + } + + try { + fPublisher.send(partition, message); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void activateOptions() { + if (hosts != null && topic != null && partition != null) { + fPublisher = MRClientFactory.createBatchingPublisher(hosts.split(","), topic, maxBatchSize, maxAgeMs, compress); + } else { + LogLog.error("The Hosts, Topic, and Partition parameter are required to create a MR Log4J Appender"); + } + } + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getPartition() { + return partition; + } + + public void setPartition(String partition) { + this.partition = partition; + } + + public String getHosts() { + return hosts; + } + + public void setHosts(String hosts) { + this.hosts = hosts; + } + + public int getMaxBatchSize() { + return maxBatchSize; + } + + public void setMaxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + } + + public int getMaxAgeMs() { + return maxAgeMs; + } + + public void setMaxAgeMs(int maxAgeMs) { + this.maxAgeMs = maxAgeMs; + } + + public boolean isCompress() { + return compress; + } + + public void setCompress(boolean compress) { + this.compress = compress; + } + +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java new file mode 100644 index 0000000..2294d7b --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java @@ -0,0 +1,87 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.test.clients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + +/** + * A simple publisher that reads from std in, sending each line as a message. + * @author author + */ +public class ConsolePublisher +{ + public static void main ( String[] args ) throws IOException //throws IOException, InterruptedException + { + // read the hosts(s) from the command line + final String hosts = ( args.length > 0 ? args[0] : "aaa.it.att.com,bbb.it.att.com,ccc.it.att.com" ); + + // read the topic name from the command line + final String topic = ( args.length > 1 ? args[1] : "TEST-TOPIC" ); + + // read the topic name from the command line + final String partition = ( args.length > 2 ? args[2] : UUID.randomUUID ().toString () ); + + // set up some batch limits and the compression flag + final int maxBatchSize = 100; + final long maxAgeMs = 250; + final boolean withGzip = false; + + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, withGzip ); + + final BufferedReader cin = new BufferedReader ( new InputStreamReader ( System.in ) ); + try + { + String line = null; + while ( ( line = cin.readLine () ) != null ) + { + pub.send ( partition, line ); + } + } + finally + { + List<message> leftovers = null; + try + { + leftovers = pub.close ( 10, TimeUnit.SECONDS ); + } + catch ( InterruptedException e ) + { + System.err.println ( "Send on close interrupted." ); + } + for ( message m : leftovers ) + { + System.err.println ( "Unsent message: " + m.fMsg ); + } + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java new file mode 100644 index 0000000..6e86d47 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java @@ -0,0 +1,46 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +/** + * + */ +package com.att.nsa.mr.test.clients; + +/** + * @author author + * + */ +public enum ProtocolTypeConstants { + + DME2("DME2"), + AAF_AUTH("HTTPAAF"), + AUTH_KEY("HTTPAUTH"); + + private String value; + + private ProtocolTypeConstants(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java new file mode 100644 index 0000000..8e1c0e0 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java @@ -0,0 +1,87 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; + +import java.io.IOException; +import java.util.LinkedList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; + +public class SampleConsumer { + public static void main ( String[] args ) + { + final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class); + + + LOG.info("Sample Consumer Class executing"); + final String topic = "com.att.app.dmaap.mr.testingTopic"; + final String url = ( args.length > 1 ? args[1] : "localhost:8181" ); + final String group = ( args.length > 2 ? args[2] :"grp" ); + /*final String id = ( args.length > 3 ? args[3] : "0" );*/ + final String id = ( args.length > 3 ? args[3] : "1" ); + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis (); + + final LinkedList<String> urlList = new LinkedList<String> (); + for ( String u : url.split ( "," ) ) + { + urlList.add ( u ); + } + + final MRConsumer cc = MRClientFactory.createConsumer ( urlList, topic, group, id, 10*1000, 1000, null, "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ); + try + { + while ( true ) + { + for ( String msg : cc.fetch () ) + { + //System.out.println ( "" + (++count) + ": " + msg ); + LOG.info ( "" + (++count) + ": " + msg ); + } + + if ( count > nextReport ) + { + nextReport += 5000; + + final long endMs = System.currentTimeMillis (); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + //System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + LOG.info ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + } + LOG.info ( "" + (++count) + ": consumed message" ); + } + } + catch ( Exception x ) + { + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java new file mode 100644 index 0000000..36c2b7f --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java @@ -0,0 +1,86 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRClientBuilders.PublisherBuilder; +import com.att.nsa.mr.client.MRPublisher.message; + +public class SamplePublisher { + public static void main ( String[] args ) throws IOException, InterruptedException + { + final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class); + // read the hosts(s) from the command line + final String hosts = ( args.length > 0 ? args[0] : "localhost:8181" ); + + // read the topic name from the command line + //final String topic = ( args.length > 1 ? args[1] : "MY-EXAMPLE-TOPIC" ); + final String topic = ( args.length > 1 ? args[1] : "com.att.app.dmaap.mr.testingTopic" ); + + // set up some batch limits and the compression flag + final int maxBatchSize = 100; + final int maxAgeMs = 250; + final boolean withGzip = false; + + // create our publisher + + final MRBatchingPublisher pub = new PublisherBuilder (). + usingHosts ( hosts ). + onTopic ( topic ).limitBatch(maxBatchSize, maxAgeMs). + authenticatedBy ( "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ). + build () + ; + // publish some messages + final JSONObject msg1 = new JSONObject (); + msg1.put ( "name", "tttttttttttttttt" ); + msg1.put ( "greeting", "ooooooooooooooooo" ); + pub.send ( "MyPartitionKey", msg1.toString () ); + + final JSONObject msg2 = new JSONObject (); + msg2.put ( "now", System.currentTimeMillis () ); + pub.send ( "MyOtherPartitionKey", msg2.toString () ); + + // ... + + // close the publisher to make sure everything's sent before exiting. The batching + // publisher interface allows the app to get the set of unsent messages. It could + // write them to disk, for example, to try to send them later. + final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS ); + if ( stuck.size () > 0 ) + { + LOG.warn ( stuck.size() + " messages unsent" ); + } + else + { + LOG.info ( "Clean exit; all messages sent." ); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java new file mode 100644 index 0000000..3a131a0 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java @@ -0,0 +1,86 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.test.clients; + + + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.util.Properties; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; + +public class SimpleExampleConsumer +{ + + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + public static void main ( String[] args ) + { + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis (); + + try + { + String routeFilePath="/src/main/resources/dme2/preferredRoute.txt"; + + + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + final MRConsumer cc = MRClientFactory.createConsumer ( "/src/main/resources/dme2/consumer.properties" ); + while ( true ) + { + for ( String msg : cc.fetch () ) + { + //System.out.println ( "" + (++count) + ": " + msg ); + System.out.println(msg); + } + + if ( count > nextReport ) + { + nextReport += 5000; + + final long endMs = System.currentTimeMillis (); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + } + } + } + catch ( Exception x ) + { + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java new file mode 100644 index 0000000..b3c167b --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java @@ -0,0 +1,90 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.util.Properties; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.response.MRConsumerResponse; + +public class SimpleExampleConsumerWithReturnResponse { + + + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + public static void main ( String[] args ) + { + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis (); + + try + { + String routeFilePath="src/main/resources/dme2/preferredRoute.txt"; + + + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" ); + while ( true ) + { + MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse(); + System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode()); + + System.out.println("mrConsumerResponse Message :"+mrConsumerResponse.getResponseMessage()); + + System.out.println("mrConsumerResponse ActualMessage :"+mrConsumerResponse.getActualMessages()); + /*for ( String msg : mrConsumerResponse.getActualMessages() ) + { + //System.out.println ( "" + (++count) + ": " + msg ); + System.out.println(msg); + }*/ + if ( count > nextReport ) + { + nextReport += 5000; + + final long endMs = System.currentTimeMillis (); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + } + } + } + catch ( Exception x ) + { + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } + +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java new file mode 100644 index 0000000..01e770e --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java @@ -0,0 +1,98 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.test.clients; + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.json.JSONObject; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + +/** + * An example of how to use the Java publisher. + * @author author + */ +public class SimpleExamplePublisher +{ + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + public void publishMessage ( String producerFilePath ) throws IOException, InterruptedException, Exception + { + + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath); + // publish some messages + final JSONObject msg1 = new JSONObject (); + msg1.put ( "Name", "Sprint" ); + //msg1.put ( "greeting", "Hello .." ); + pub.send ( "First cambria messge" ); + pub.send ( "MyPartitionKey", msg1.toString () ); + + final JSONObject msg2 = new JSONObject (); + //msg2.put ( "mrclient1", System.currentTimeMillis () ); + + + // ... + + // close the publisher to make sure everything's sent before exiting. The batching + // publisher interface allows the app to get the set of unsent messages. It could + // write them to disk, for example, to try to send them later. + final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS ); + if ( stuck.size () > 0 ) + { + System.err.println ( stuck.size() + " messages unsent" ); + } + else + { + System.out.println ( "Clean exit; all messages sent." ); + } + } + + public static void main(String []args) throws InterruptedException, Exception{ + + String routeFilePath="/src/main/resources/dme2/preferredRoute.txt"; + + SimpleExamplePublisher publisher = new SimpleExamplePublisher(); + + + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + publisher.publishMessage("/src/main/resources/dme2/producer.properties"); + } + +} + diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java new file mode 100644 index 0000000..4914688 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java @@ -0,0 +1,84 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.json.JSONObject; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.response.MRPublisherResponse; + /** + *An example of how to use the Java publisher. + * @author author + * + */ + public class SimpleExamplePublisherWithResponse + { + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + + public static void main(String []args) throws InterruptedException, Exception{ + + String routeFilePath="src/main/resources/dme2/preferredRoute.txt"; + String msgCount = args[0]; + SimpleExamplePublisherWithResponse publisher = new SimpleExamplePublisherWithResponse(); + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + int i=0; + while (i< Integer.valueOf(msgCount)) + { + publisher.publishMessage("src/main/resources/dme2/producer.properties",Integer.valueOf(msgCount)); + i++; + } + } + + public void publishMessage ( String producerFilePath , int count ) throws IOException, InterruptedException, Exception + { + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath,true); + // publish some messages + final JSONObject msg1 = new JSONObject (); + + msg1.put ( "Partition:1", "Message:"+count); + msg1.put ( "greeting", "Hello .." ); + + + pub.send ( "1", msg1.toString()); + pub.send ( "1", msg1.toString()); + + MRPublisherResponse res= pub.sendBatchWithResponse(); + + System.out.println("Pub response->"+res.toString()); + } + + + } diff --git a/src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java b/src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java new file mode 100644 index 0000000..3c123b1 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java @@ -0,0 +1,184 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.support; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.response.MRPublisherResponse; + +/** + * A helper for unit testing systems that use a MRPublisher. When setting + * up your test, inject an instance into MRClientFactory to have it return + * the mock client. + * + * @author author + * + */ +public class MRBatchingPublisherMock implements MRBatchingPublisher +{ + public class Entry + { + public Entry ( String partition, String msg ) + { + fPartition = partition; + fMessage = msg; + } + + @Override + public String toString () + { + return fMessage; + } + + public final String fPartition; + public final String fMessage; + } + + public MRBatchingPublisherMock () + { + fCaptures = new LinkedList<Entry> (); + } + + public interface Listener + { + void onMessage ( Entry e ); + } + public void addListener ( Listener listener ) + { + fListeners.add ( listener ); + } + + public List<Entry> getCaptures () + { + return getCaptures ( new MessageFilter () { @Override public boolean match ( String msg ) { return true; } } ); + } + + public interface MessageFilter + { + boolean match ( String msg ); + } + + public List<Entry> getCaptures ( MessageFilter filter ) + { + final LinkedList<Entry> result = new LinkedList<Entry> (); + for ( Entry capture : fCaptures ) + { + if ( filter.match ( capture.fMessage ) ) + { + result.add ( capture ); + } + } + return result; + } + + public int received () + { + return fCaptures.size(); + } + + public void reset () + { + fCaptures.clear (); + } + + @Override + public int send ( String partition, String msg ) + { + final Entry e = new Entry ( partition, msg ); + + fCaptures.add ( e ); + for ( Listener l : fListeners ) + { + l.onMessage ( e ); + } + return 1; + } + + @Override + public int send ( message msg ) + { + return send ( msg.fPartition, msg.fMsg ); + } + @Override + public int send ( String msg ) + { + return 1; + + } + + @Override + public int send ( Collection<message> msgs ) + { + int sum = 0; + for ( message m : msgs ) + { + sum += send ( m ); + } + return sum; + } + + @Override + public int getPendingMessageCount () + { + return 0; + } + + @Override + public List<message> close ( long timeout, TimeUnit timeoutUnits ) + { + return new LinkedList<message> (); + } + + @Override + public void close () + { + } + + @Override + public void setApiCredentials ( String apiKey, String apiSecret ) + { + } + + @Override + public void clearApiCredentials () + { + } + + @Override + public void logTo ( Logger log ) + { + } + + private final LinkedList<Entry> fCaptures; + private LinkedList<Listener> fListeners = new LinkedList<Listener> (); + @Override + public MRPublisherResponse sendBatchWithResponse() { + // TODO Auto-generated method stub + return null; + } +} diff --git a/src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java b/src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java new file mode 100644 index 0000000..3373710 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java @@ -0,0 +1,168 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.support; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; + +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.response.MRConsumerResponse; + +/** + * A helper for unit testing systems that use a MRConsumer. When setting + * up your test, inject an instance into MRClientFactory to have it return + * the mock client. + * + * @author author + * + */ +public class MRConsumerMock implements MRConsumer +{ + public class Entry + { + public Entry ( long waitMs, int statusCode, List<String> msgs ) + { + fWaitMs = waitMs; + fStatusCode = statusCode; + fStatusMsg = null; + fMsgs = new LinkedList<String> ( msgs ); + } + + public Entry ( long waitMs, int statusCode, String statusMsg ) + { + fWaitMs = waitMs; + fStatusCode = statusCode; + fStatusMsg = statusMsg; + fMsgs = null; + } + + public LinkedList<String> run () throws IOException + { + try + { + Thread.sleep ( fWaitMs ); + if ( fStatusCode >= 200 && fStatusCode <= 299 ) + { + return fMsgs; + } + throw new IOException ( "" + fStatusCode + " " + fStatusMsg ); + } + catch ( InterruptedException e ) + { + throw new IOException ( e ); + } + } + + private final long fWaitMs; + private final int fStatusCode; + private final String fStatusMsg; + private final LinkedList<String> fMsgs; + } + + public MRConsumerMock () + { + fReplies = new LinkedList<Entry> (); + } + + @Override + public void close () + { + } + + @Override + public void setApiCredentials ( String apiKey, String apiSecret ) + { + } + + @Override + public void clearApiCredentials () + { + } + + public synchronized void add ( Entry e ) + { + fReplies.add ( e ); + } + + public void addImmediateMsg ( String msg ) + { + addDelayedMsg ( 0, msg ); + } + + public void addDelayedMsg ( long delay, String msg ) + { + final LinkedList<String> list = new LinkedList<String> (); + list.add ( msg ); + add ( new Entry ( delay, 200, list ) ); + } + + public void addImmediateMsgGroup ( List<String> msgs ) + { + addDelayedMsgGroup ( 0, msgs ); + } + + public void addDelayedMsgGroup ( long delay, List<String> msgs ) + { + final LinkedList<String> list = new LinkedList<String> ( msgs ); + add ( new Entry ( delay, 200, list ) ); + } + + public void addImmediateError ( int statusCode, String statusText ) + { + add ( new Entry ( 0, statusCode, statusText ) ); + } + + @Override + public Iterable<String> fetch () throws IOException + { + return fetch ( -1, -1 ); + } + + @Override + public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException + { + return fReplies.size () > 0 ? fReplies.removeFirst ().run() : new LinkedList<String>(); + } + + @Override + public void logTo ( Logger log ) + { + } + + private final LinkedList<Entry> fReplies; + + @Override + public MRConsumerResponse fetchWithReturnConsumerResponse() { + // TODO Auto-generated method stub + return null; + } + + @Override + public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, + int limit) { + // TODO Auto-generated method stub + return null; + } +} diff --git a/src/main/java/com/att/nsa/mr/tools/ApiKeyCommand.java b/src/main/java/com/att/nsa/mr/tools/ApiKeyCommand.java new file mode 100644 index 0000000..0faefe7 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/tools/ApiKeyCommand.java @@ -0,0 +1,134 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.tools; + +import java.io.IOException; +import java.io.PrintStream; + +import com.att.nsa.apiClient.credentials.ApiCredential; +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.apiClient.http.HttpObjectNotFoundException; +import com.att.nsa.cmdtool.Command; +import com.att.nsa.cmdtool.CommandNotReadyException; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRIdentityManager; +import com.att.nsa.mr.client.MRClient.MRApiException; +import com.att.nsa.mr.client.MRIdentityManager.ApiKey; + +public class ApiKeyCommand implements Command<MRCommandContext> +{ + + @Override + public String[] getMatches () + { + return new String[]{ + "key (create|update) (\\S*) (\\S*)", + "key (list) (\\S*)", + "key (revoke)", + }; + } + + @Override + public void checkReady ( MRCommandContext context ) throws CommandNotReadyException + { + if ( !context.checkClusterReady () ) + { + throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." ); + } + } + + @Override + public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException + { + final MRIdentityManager tm = MRClientFactory.createIdentityManager ( context.getCluster(), context.getApiKey(), context.getApiPwd() ); + context.applyTracer ( tm ); + + try + { + if ( parts[0].equals ( "list" ) ) + { + final ApiKey key = tm.getApiKey ( parts[1] ); + if ( key != null ) + { + out.println ( "email: " + key.getEmail () ); + out.println ( "description: " + key.getDescription () ); + } + else + { + out.println ( "No key returned" ); + } + } + else if ( parts[0].equals ( "create" ) ) + { + final ApiCredential ac = tm.createApiKey ( parts[1], parts[2] ); + if ( ac != null ) + { + out.println ( " key: " + ac.getApiKey () ); + out.println ( "secret: " + ac.getApiSecret () ); + } + else + { + out.println ( "No credential returned?" ); + } + } + else if ( parts[0].equals ( "update" ) ) + { + tm.updateCurrentApiKey ( parts[1], parts[2] ); + out.println ( "Updated" ); + } + else if ( parts[0].equals ( "revoke" ) ) + { + tm.deleteCurrentApiKey (); + out.println ( "Updated" ); + } + } + catch ( HttpObjectNotFoundException e ) + { + out.println ( "Object not found: " + e.getMessage () ); + } + catch ( HttpException e ) + { + out.println ( "HTTP exception: " + e.getMessage () ); + } + catch ( MRApiException e ) + { + out.println ( "API exception: " + e.getMessage () ); + } + catch ( IOException e ) + { + out.println ( "IO exception: " + e.getMessage () ); + } + finally + { + tm.close (); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "key create <email> <description>" ); + out.println ( "key update <email> <description>" ); + out.println ( "key list <key>" ); + out.println ( "key revoke" ); + } +} diff --git a/src/main/java/com/att/nsa/mr/tools/AuthCommand.java b/src/main/java/com/att/nsa/mr/tools/AuthCommand.java new file mode 100644 index 0000000..72f226b --- /dev/null +++ b/src/main/java/com/att/nsa/mr/tools/AuthCommand.java @@ -0,0 +1,69 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.tools; + +import java.io.PrintStream; + +import com.att.nsa.cmdtool.Command; +import com.att.nsa.cmdtool.CommandNotReadyException; + +public class AuthCommand implements Command<MRCommandContext> +{ + @Override + public void checkReady ( MRCommandContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException + { + if ( parts.length > 0 ) + { + context.setAuth ( parts[0], parts[1] ); + out.println ( "Now authenticating with " + parts[0] ); + } + else + { + context.clearAuth (); + out.println ( "No longer authenticating." ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "auth <apiKey> <apiSecret>" ); + out.println ( "\tuse these credentials on subsequent transactions" ); + out.println ( "noauth" ); + out.println ( "\tdo not use credentials on subsequent transactions" ); + } + + @Override + public String[] getMatches () + { + return new String[] + { + "auth (\\S*) (\\S*)", + "noauth" + }; + } +} diff --git a/src/main/java/com/att/nsa/mr/tools/ClusterCommand.java b/src/main/java/com/att/nsa/mr/tools/ClusterCommand.java new file mode 100644 index 0000000..5ecaf7c --- /dev/null +++ b/src/main/java/com/att/nsa/mr/tools/ClusterCommand.java @@ -0,0 +1,80 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.tools; + +import java.io.PrintStream; + +import com.att.nsa.cmdtool.Command; +import com.att.nsa.cmdtool.CommandNotReadyException; +import com.att.nsa.mr.client.impl.MRConsumerImpl; + +public class ClusterCommand implements Command<MRCommandContext> +{ + + @Override + public String[] getMatches () + { + return new String[]{ + "cluster", + "cluster (\\S*)?", + }; + } + + @Override + public void checkReady ( MRCommandContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException + { + if ( parts.length == 0 ) + { + for ( String host : context.getCluster () ) + { + out.println ( host ); + } + } + else + { + context.clearCluster (); + for ( String part : parts ) + { + String[] hosts = part.trim().split ( "\\s+" ); + for ( String host : hosts ) + { + for ( String splitHost : MRConsumerImpl.stringToList(host) ) + { + context.addClusterHost ( splitHost ); + } + } + } + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "cluster host1 host2 ..." ); + } + +} diff --git a/src/main/java/com/att/nsa/mr/tools/MRCommandContext.java b/src/main/java/com/att/nsa/mr/tools/MRCommandContext.java new file mode 100644 index 0000000..3430d9c --- /dev/null +++ b/src/main/java/com/att/nsa/mr/tools/MRCommandContext.java @@ -0,0 +1,100 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.tools; + +import java.util.Collection; +import java.util.LinkedList; + +import com.att.nsa.apiClient.http.HttpClient; +import com.att.nsa.apiClient.http.HttpTracer; +import com.att.nsa.cmdtool.CommandContext; +import com.att.nsa.mr.client.MRClient; + +public class MRCommandContext implements CommandContext +{ + public MRCommandContext () + { + fApiKey = null; + fApiPwd = null; + + fCluster = new LinkedList<String> (); + fCluster.add ( "localhost" ); + } + + @Override + public void requestShutdown () + { + fShutdown = true; + } + + @Override + public boolean shouldContinue () + { + return !fShutdown; + } + + public void setAuth ( String key, String pwd ) { fApiKey = key; fApiPwd = pwd; } + public void clearAuth () { setAuth(null,null); } + + public boolean checkClusterReady () + { + return ( fCluster.size () != 0 ); + } + + public Collection<String> getCluster () + { + return new LinkedList<String> ( fCluster ); + } + + public void clearCluster () + { + fCluster.clear (); + } + + public void addClusterHost ( String host ) + { + fCluster.add ( host ); + } + + public String getApiKey () { return fApiKey; } + public String getApiPwd () { return fApiPwd; } + + public void useTracer ( HttpTracer t ) + { + fTracer = t; + } + public void noTracer () { fTracer = null; } + + public void applyTracer ( MRClient cc ) + { + if ( cc instanceof HttpClient && fTracer != null ) + { + ((HttpClient)cc).installTracer ( fTracer ); + } + } + + private boolean fShutdown; + private String fApiKey; + private String fApiPwd; + private final LinkedList<String> fCluster; + private HttpTracer fTracer = null; +} diff --git a/src/main/java/com/att/nsa/mr/tools/MRTool.java b/src/main/java/com/att/nsa/mr/tools/MRTool.java new file mode 100644 index 0000000..7f1effd --- /dev/null +++ b/src/main/java/com/att/nsa/mr/tools/MRTool.java @@ -0,0 +1,49 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.tools; + +import java.io.IOException; + +import com.att.nsa.cmdtool.CommandLineTool; +import com.att.nsa.mr.client.impl.MRClientVersionInfo; + +public class MRTool extends CommandLineTool<MRCommandContext> +{ + protected MRTool () + { + super ( "MR Tool (" + MRClientVersionInfo.getVersion () + ")", "MR> " ); + + registerCommand ( new ApiKeyCommand () ); + registerCommand ( new AuthCommand () ); + registerCommand ( new ClusterCommand () ); + registerCommand ( new MessageCommand () ); + registerCommand ( new TopicCommand () ); + registerCommand ( new TraceCommand () ); + } + + public static void main ( String[] args ) throws IOException + { + final MRTool ct = new MRTool (); + final MRCommandContext ccc = new MRCommandContext (); + ct.runFromMain ( args, ccc ); + } +} diff --git a/src/main/java/com/att/nsa/mr/tools/MessageCommand.java b/src/main/java/com/att/nsa/mr/tools/MessageCommand.java new file mode 100644 index 0000000..2c646bf --- /dev/null +++ b/src/main/java/com/att/nsa/mr/tools/MessageCommand.java @@ -0,0 +1,128 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.tools; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.att.nsa.cmdtool.Command; +import com.att.nsa.cmdtool.CommandNotReadyException; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.MRClientBuilders.PublisherBuilder; +import com.att.nsa.mr.client.MRPublisher.message; + +public class MessageCommand implements Command<MRCommandContext> +{ + + @Override + public String[] getMatches () + { + return new String[]{ + "(post) (\\S*) (\\S*) (.*)", + "(read) (\\S*) (\\S*) (\\S*)", + }; + } + + @Override + public void checkReady ( MRCommandContext context ) throws CommandNotReadyException + { + if ( !context.checkClusterReady () ) + { + throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." ); + } + } + + @Override + public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException + { + if ( parts[0].equalsIgnoreCase ( "read" )) + { + final MRConsumer cc = MRClientFactory.createConsumer ( context.getCluster (), parts[1], parts[2], parts[3], + -1, -1, null, context.getApiKey(), context.getApiPwd() ); + context.applyTracer ( cc ); + try + { + for ( String msg : cc.fetch () ) + { + out.println ( msg ); + } + } + catch ( Exception e ) + { + out.println ( "Problem fetching messages: " + e.getMessage() ); + } + finally + { + cc.close (); + } + } + else + { + final MRBatchingPublisher pub = new PublisherBuilder (). + usingHosts ( context.getCluster () ). + onTopic ( parts[1] ). + authenticatedBy ( context.getApiKey(), context.getApiPwd() ). + build () + ; + try + { + pub.send ( parts[2], parts[3] ); + } + catch ( IOException e ) + { + out.println ( "Problem sending message: " + e.getMessage() ); + } + finally + { + List<message> left = null; + try + { + left = pub.close ( 500, TimeUnit.MILLISECONDS ); + } + catch ( IOException e ) + { + out.println ( "Problem sending message: " + e.getMessage() ); + } + catch ( InterruptedException e ) + { + out.println ( "Problem sending message: " + e.getMessage() ); + } + if ( left != null && left.size () > 0 ) + { + out.println ( left.size() + " messages not sent." ); + } + } + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "post <topicName> <partition> <message>" ); + out.println ( "read <topicName> <consumerGroup> <consumerId>" ); + } + +} diff --git a/src/main/java/com/att/nsa/mr/tools/TopicCommand.java b/src/main/java/com/att/nsa/mr/tools/TopicCommand.java new file mode 100644 index 0000000..8010a58 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/tools/TopicCommand.java @@ -0,0 +1,209 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.tools; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.Set; + +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.apiClient.http.HttpObjectNotFoundException; +import com.att.nsa.cmdtool.Command; +import com.att.nsa.cmdtool.CommandNotReadyException; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRTopicManager; +import com.att.nsa.mr.client.MRTopicManager.TopicInfo; + +public class TopicCommand implements Command<MRCommandContext> +{ + + @Override + public String[] getMatches () + { + return new String[]{ + "topic (list)", + "topic (list) (\\S*)", + "topic (create) (\\S*) (\\S*) (\\S*)", + "topic (grant|revoke) (read|write) (\\S*) (\\S*)", + }; + } + + @Override + public void checkReady ( MRCommandContext context ) throws CommandNotReadyException + { + if ( !context.checkClusterReady () ) + { + throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." ); + } + } + + @Override + public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException + { + final MRTopicManager tm = MRClientFactory.createTopicManager ( context.getCluster(), context.getApiKey(), context.getApiPwd() ); + context.applyTracer ( tm ); + + try + { + if ( parts[0].equals ( "list" ) ) + { + try + { + if ( parts.length == 1 ) + { + for ( String topic : tm.getTopics () ) + { + out.println ( topic ); + } + } + else + { + final TopicInfo ti = tm.getTopicMetadata ( parts[1] ); + + final String owner = ti.getOwner (); + out.println ( " owner: " + ( owner == null ? "<none>" : owner ) ); + + final String desc = ti.getDescription (); + out.println ( "description: " + ( desc == null ? "<none>" : desc ) ); + + final Set<String> prods = ti.getAllowedProducers (); + if ( prods != null ) + { + out.println ( " write ACL: " ); + for ( String key : prods ) + { + out.println ( "\t" + key ); + } + } + else + { + out.println ( " write ACL: <not active>" ); + } + + final Set<String> cons = ti.getAllowedConsumers (); + if ( cons != null ) + { + out.println ( " read ACL: " ); + for ( String key : cons ) + { + out.println ( "\t" + key ); + } + } + else + { + out.println ( " read ACL: <not active>" ); + } + } + } + catch ( IOException x ) + { + out.println ( "Problem with request: " + x.getMessage () ); + } + catch ( HttpObjectNotFoundException e ) + { + out.println ( "Not found: " + e.getMessage () ); + } + } + else if ( parts[0].equals ( "create" ) ) + { + try + { + final int partitions = Integer.parseInt ( parts[2] ); + final int replicas = Integer.parseInt ( parts[3] ); + + tm.createTopic ( parts[1], "", partitions, replicas ); + } + catch ( HttpException e ) + { + out.println ( "Problem with request: " + e.getMessage () ); + } + catch ( IOException e ) + { + out.println ( "Problem with request: " + e.getMessage () ); + } + catch ( NumberFormatException e ) + { + out.println ( "Problem with request: " + e.getMessage () ); + } + } + else if ( parts[0].equals ( "grant" ) ) + { + try + { + if ( parts[1].equals ( "write" ) ) + { + tm.allowProducer ( parts[2], parts[3] ); + } + else if ( parts[1].equals ( "read" ) ) + { + tm.allowConsumer ( parts[2], parts[3] ); + } + } + catch ( HttpException e ) + { + out.println ( "Problem with request: " + e.getMessage () ); + } + catch ( IOException e ) + { + out.println ( "Problem with request: " + e.getMessage () ); + } + } + else if ( parts[0].equals ( "revoke" ) ) + { + try + { + if ( parts[1].equals ( "write" ) ) + { + tm.revokeProducer ( parts[2], parts[3] ); + } + else if ( parts[1].equals ( "read" ) ) + { + tm.revokeConsumer ( parts[2], parts[3] ); + } + } + catch ( HttpException e ) + { + out.println ( "Problem with request: " + e.getMessage () ); + } + catch ( IOException e ) + { + out.println ( "Problem with request: " + e.getMessage () ); + } + } + } + finally + { + tm.close (); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "topic list" ); + out.println ( "topic list <topicName>" ); + out.println ( "topic create <topicName> <partitions> <replicas>" ); + out.println ( "topic grant write|read <topicName> <apiKey>" ); + out.println ( "topic revoke write|read <topicName> <apiKey>" ); + } + +} diff --git a/src/main/java/com/att/nsa/mr/tools/TraceCommand.java b/src/main/java/com/att/nsa/mr/tools/TraceCommand.java new file mode 100644 index 0000000..0489172 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/tools/TraceCommand.java @@ -0,0 +1,118 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.tools; + +import java.io.PrintStream; +import java.net.URI; +import java.util.List; +import java.util.Map; + +import com.att.nsa.apiClient.http.HttpTracer; +import com.att.nsa.cmdtool.Command; +import com.att.nsa.cmdtool.CommandNotReadyException; + +public class TraceCommand implements Command<MRCommandContext> +{ + @Override + public void checkReady ( MRCommandContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, MRCommandContext context, final PrintStream out ) throws CommandNotReadyException + { + if ( parts[0].equalsIgnoreCase ( "on" )) + { + context.useTracer ( new HttpTracer () + { + @Override + public void outbound ( URI uri, Map<String, List<String>> headers, String method, byte[] entity ) + { + out.println ( kLineBreak ); + out.println ( ">>> " + method + " " + uri.toString() ); + for ( Map.Entry<String,List<String>> e : headers.entrySet () ) + { + final StringBuffer vals = new StringBuffer (); + for ( String val : e.getValue () ) + { + if ( vals.length () > 0 ) vals.append ( ", " ); + vals.append ( val ); + } + out.println ( ">>> " + e.getKey () + ": " + vals.toString() ); + } + if ( entity != null ) + { + out.println (); + out.println ( new String ( entity ) ); + } + out.println ( kLineBreak ); + } + + @Override + public void inbound ( Map<String, List<String>> headers, int statusCode, String responseLine, byte[] entity ) + { + out.println ( kLineBreak ); + out.println ( "<<< " + responseLine ); + for ( Map.Entry<String,List<String>> e : headers.entrySet () ) + { + final StringBuffer vals = new StringBuffer (); + for ( String val : e.getValue () ) + { + if ( vals.length () > 0 ) vals.append ( ", " ); + vals.append ( val ); + } + out.println ( "<<< " + e.getKey () + ": " + vals.toString() ); + } + if ( entity != null ) + { + out.println (); + out.println ( new String ( entity ) ); + } + out.println ( kLineBreak ); + } + } ); + } + else + { + context.noTracer (); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "trace on|off" ); + out.println ( "\tWhen trace is on, HTTP interaction is printed to the console." ); + } + + @Override + public String[] getMatches () + { + return new String[] + { + "trace (on)", + "trace (off)" + }; + } + + private static final String kLineBreak = "======================================================================"; +} |