aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/mr/client
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client')
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/HostSelector.java198
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRBatchingPublisher.java55
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRClient.java66
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java449
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java691
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRConsumer.java54
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRIdentityManager.java100
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRPublisher.java93
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRTopicManager.java183
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/Clock.java63
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java85
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java395
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java487
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java59
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java179
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java675
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java52
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java266
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java927
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/response/MRConsumerResponse.java60
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/response/MRPublisherResponse.java67
21 files changed, 5204 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/HostSelector.java b/src/main/java/org/onap/dmaap/mr/client/HostSelector.java
new file mode 100644
index 0000000..9bd73f9
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/HostSelector.java
@@ -0,0 +1,198 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Vector;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HostSelector
+{
+ private final TreeSet<String> fBaseHosts;
+ private final DelayQueue<BlacklistEntry> fBlacklist;
+ private String fIdealHost;
+ private String fCurrentHost;
+ private static final Logger log = LoggerFactory.getLogger(HostSelector.class);
+
+ public HostSelector(String hostPart)
+ {
+ this(makeSet(hostPart), null);
+ }
+
+ public HostSelector(Collection<String> baseHosts)
+ {
+ this(baseHosts, null);
+ }
+
+ public HostSelector(Collection<String> baseHosts, String signature)
+ {
+ if (baseHosts.isEmpty())
+ {
+ throw new IllegalArgumentException("At least one host must be provided.");
+ }
+
+ this.fBaseHosts = new TreeSet(baseHosts);
+ this.fBlacklist = new DelayQueue();
+ this.fIdealHost = null;
+
+ if (signature == null) {
+ return;
+ }
+ int index = 0 ;
+ int value = signature.hashCode();
+ if(value!=0) {
+ index = Math.abs(value) % baseHosts.size();
+ }
+ Iterator it = this.fBaseHosts.iterator();
+ while (index-- > 0)
+ {
+ it.next();
+ }
+ this.fIdealHost = ((String)it.next());
+ }
+
+ public String selectBaseHost()
+ {
+ if (this.fCurrentHost == null)
+ {
+ makeSelection();
+ }
+ return this.fCurrentHost;
+ }
+
+ public void reportReachabilityProblem(long blacklistUnit, TimeUnit blacklistTimeUnit)
+ {
+ if (this.fCurrentHost == null)
+ {
+ log.warn("Reporting reachability problem, but no host is currently selected.");
+ }
+
+ if (blacklistUnit > 0L)
+ {
+ for (BlacklistEntry be : this.fBlacklist)
+ {
+ if (be.getHost().equals(this.fCurrentHost))
+ {
+ be.expireNow();
+ }
+ }
+
+ LinkedList devNull = new LinkedList();
+ this.fBlacklist.drainTo(devNull);
+
+ if (this.fCurrentHost != null)
+ {
+ this.fBlacklist.add(new BlacklistEntry(this.fCurrentHost, TimeUnit.MILLISECONDS.convert(blacklistUnit, blacklistTimeUnit)));
+ }
+ }
+ this.fCurrentHost = null;
+ }
+
+ private String makeSelection()
+ {
+ TreeSet workingSet = new TreeSet(this.fBaseHosts);
+
+ LinkedList devNull = new LinkedList();
+ this.fBlacklist.drainTo(devNull);
+ for (BlacklistEntry be : this.fBlacklist)
+ {
+ workingSet.remove(be.getHost());
+ }
+
+ if (workingSet.isEmpty())
+ {
+ log.warn("All hosts were blacklisted; reverting to full set of hosts.");
+ workingSet.addAll(this.fBaseHosts);
+ this.fCurrentHost = null;
+ }
+
+ String selection = null;
+ if ((this.fCurrentHost != null) && (workingSet.contains(this.fCurrentHost)))
+ {
+ selection = this.fCurrentHost;
+ }
+ else if ((this.fIdealHost != null) && (workingSet.contains(this.fIdealHost)))
+ {
+ selection = this.fIdealHost;
+ }
+ else
+ {
+ int index = 0;
+ int value = new Random().nextInt();
+ Vector v = new Vector(workingSet);
+ if(value!=0) {
+ index = Math.abs(value) % workingSet.size();
+ }
+ selection = (String)v.elementAt(index);
+ }
+
+ this.fCurrentHost = selection;
+ return this.fCurrentHost;
+ }
+
+ private static Set<String> makeSet(String s)
+ {
+ TreeSet set = new TreeSet();
+ set.add(s);
+ return set; }
+
+ private static class BlacklistEntry implements Delayed {
+ private final String fHost;
+ private long fExpireAtMs;
+
+ public BlacklistEntry(String host, long delayMs) {
+ this.fHost = host;
+ this.fExpireAtMs = (System.currentTimeMillis() + delayMs);
+ }
+
+ public void expireNow()
+ {
+ this.fExpireAtMs = 0L;
+ }
+
+ public String getHost()
+ {
+ return this.fHost;
+ }
+
+ public int compareTo(Delayed o)
+ {
+ Long thisDelay = Long.valueOf(getDelay(TimeUnit.MILLISECONDS));
+ return thisDelay.compareTo(Long.valueOf(o.getDelay(TimeUnit.MILLISECONDS)));
+ }
+
+ public long getDelay(TimeUnit unit)
+ {
+ long remainingMs = this.fExpireAtMs - System.currentTimeMillis();
+ return unit.convert(remainingMs, TimeUnit.MILLISECONDS);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRBatchingPublisher.java b/src/main/java/org/onap/dmaap/mr/client/MRBatchingPublisher.java
new file mode 100644
index 0000000..df440bb
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/MRBatchingPublisher.java
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.onap.dmaap.mr.client.response.MRPublisherResponse;
+
+/**
+ * A MR batching publisher is a publisher with additional functionality
+ * for managing delayed sends.
+ *
+ * @author author
+ *
+ */
+public interface MRBatchingPublisher extends MRPublisher
+{
+ /**
+ * Get the number of messages that have not yet been sent.
+ * @return the number of pending messages
+ */
+ int getPendingMessageCount ();
+
+ /**
+ * Close this publisher, sending any remaining messages.
+ * @param timeout an amount of time to wait for unsent messages to be sent
+ * @param timeoutUnits the time unit for the timeout arg
+ * @return a list of any unsent messages after the timeout
+ * @throws IOException exception
+ * @throws InterruptedException exception
+ */
+ List<message> close ( long timeout, TimeUnit timeoutUnits ) throws IOException, InterruptedException;
+
+ MRPublisherResponse sendBatchWithResponse ();
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClient.java b/src/main/java/org/onap/dmaap/mr/client/MRClient.java
new file mode 100644
index 0000000..8cbf9e0
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/MRClient.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client;
+
+import org.slf4j.Logger;
+
+
+public interface MRClient
+{
+ /**
+ * An exception at the MR layer. This is used when the HTTP transport
+ * layer returns a success code but the transaction is not completed as expected.
+ */
+ public class MRApiException extends Exception
+ {
+ private static final long serialVersionUID = 1L;
+ public MRApiException ( String msg ) { super ( msg ); }
+ public MRApiException ( String msg, Throwable t ) { super ( msg, t ); }
+ }
+
+ /**
+ * Optionally set the Logger to use
+ * @param log log
+ */
+ void logTo ( Logger log );
+
+ /**
+ * Set the API credentials for this client connection. Subsequent calls will
+ * include authentication headers.who i
+ */
+ /**
+ * @param apiKey apikey
+ * @param apiSecret apisec
+ */
+ void setApiCredentials ( String apiKey, String apiSecret );
+
+ /**
+ * Remove API credentials, if any, on this connection. Subsequent calls will not include
+ * authentication headers.
+ */
+ void clearApiCredentials ();
+
+ /**
+ * Close this connection. Some client interfaces have additional close capability.
+ */
+ void close ();
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java
new file mode 100644
index 0000000..60e0666
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java
@@ -0,0 +1,449 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client;
+
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
+import org.onap.dmaap.mr.client.impl.MRMetaClient;
+import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
+
+/**
+ * A collection of builders for various types of MR API clients
+ *
+ * @author author
+ */
+public class MRClientBuilders
+{
+
+ /**
+ * Instantiates MRClientBuilders.
+ */
+ private MRClientBuilders() {
+ // prevent instantiation
+ }
+
+ /**
+ * A builder for a topic Consumer
+ * @author author
+ */
+ public static class ConsumerBuilder
+ {
+ /**
+ * Construct a consumer builder.
+ */
+ public ConsumerBuilder () {}
+
+ /**
+ * Set the host list
+ * @param hostList a comma-separated list of hosts to use to connect to MR
+ * @return this builder
+ */
+ public ConsumerBuilder usingHosts ( String hostList ) {
+ return usingHosts ( MRConsumerImpl.stringToList(hostList) );
+ }
+
+ /**
+ * Set the host list
+ * @param hostSet a set of hosts to use to connect to MR
+ * @return this builder
+ */
+ public ConsumerBuilder usingHosts ( Collection<String> hostSet ) {
+ fHosts = hostSet; return this;
+ }
+
+ /**
+ * Set the topic
+ * @param topic the name of the topic to consume
+ * @return this builder
+ */
+ public ConsumerBuilder onTopic ( String topic ) {
+ fTopic=topic;
+ return this;
+ }
+
+ /**
+ * Set the consumer's group and ID
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consumer in its group
+ * @return this builder
+ */
+ public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) {
+ fGroup = consumerGroup;
+ fId = consumerId;
+ return this;
+ }
+
+ /**
+ * Set the API key and secret for this client.
+ * @param apiKey
+ * @param apiSecret
+ * @return this builder
+ */
+ public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) {
+ fApiKey = apiKey;
+ fApiSecret = apiSecret;
+ return this;
+ }
+
+ /**
+ * Set the server side timeout
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
+ * @return this builder
+ */
+ public ConsumerBuilder waitAtServer ( int timeoutMs ) {
+ fTimeoutMs = timeoutMs;
+ return this;
+ };
+
+ /**
+ * Set the maximum number of messages to receive per transaction
+ * @param limit The maximum number of messages to receive from the server in one transaction.
+ * @return this builder
+ */
+ public ConsumerBuilder receivingAtMost ( int limit ) {
+ fLimit = limit;
+ return this;
+ };
+
+ /**
+ * Set a filter to use on the server
+ * @param filter a Highland Park standard library filter encoded in JSON
+ * @return this builder
+ */
+ public ConsumerBuilder withServerSideFilter ( String filter ) {
+ fFilter = filter;
+ return this;
+ }
+
+ /**
+ * Build the consumer
+ * @return a consumer
+ */
+ public MRConsumer build ()
+ {
+ if ( fHosts == null || fHosts.size() == 0 || fTopic == null )
+ {
+ throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ }
+
+ if ( fGroup == null )
+ {
+ fGroup = UUID.randomUUID ().toString ();
+ fId = "0";
+ log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
+ }
+
+ if ( sfConsumerMock != null ) return sfConsumerMock;
+ try {
+ return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret );
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private Collection<String> fHosts = null;
+ private String fTopic = null;
+ private String fGroup = null;
+ private String fId = null;
+ private String fApiKey = null;
+ private String fApiSecret = null;
+ private int fTimeoutMs = -1;
+ private int fLimit = -1;
+ private String fFilter = null;
+ }
+
+ /*************************************************************************/
+ /*************************************************************************/
+ /*************************************************************************/
+
+ /**
+ * A publisher builder
+ * @author author
+ */
+ public static class PublisherBuilder
+ {
+ public PublisherBuilder () {}
+
+ /**
+ * Set the MR/UEB host(s) to use
+ * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
+ * @return this builder
+ */
+ public PublisherBuilder usingHosts ( String hostlist ) {
+ return usingHosts ( MRConsumerImpl.stringToList(hostlist) );
+ }
+
+ /**
+ * Set the MR/UEB host(s) to use
+ * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
+ * @return this builder
+ */
+ public PublisherBuilder usingHosts ( String[] hostSet )
+ {
+ final TreeSet<String> hosts = new TreeSet<String> ();
+ for ( String hp : hostSet )
+ {
+ hosts.add ( hp );
+ }
+ return usingHosts ( hosts );
+ }
+
+ /**
+ * Set the MR/UEB host(s) to use
+ * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
+ * @return this builder
+ */
+ public PublisherBuilder usingHosts ( Collection<String> hostlist ) {
+ fHosts=hostlist;
+ return this;
+ }
+
+ /**
+ * Set the topic to publish on
+ * @param topic The topic on which to publish messages.
+ * @return this builder
+ */
+ public PublisherBuilder onTopic ( String topic ) {
+ fTopic = topic;
+ return this;
+ }
+
+ /**
+ * Batch message sends with the given limits.
+ * @param messageCount The largest set of messages to batch.
+ * @param ageInMs The maximum age of a message waiting in a batch.
+ * @return this builder
+ */
+ public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) {
+ fMaxBatchSize = messageCount;
+ fMaxBatchAgeMs = ageInMs;
+ return this;
+ }
+
+ /**
+ * Compress transactions
+ * @return this builder
+ */
+ public PublisherBuilder withCompresion () {
+ return enableCompresion(true);
+ }
+
+ /**
+ * Do not compress transactions
+ * @return this builder
+ */
+ public PublisherBuilder withoutCompresion () {
+ return enableCompresion(false);
+ }
+
+ /**
+ * Set the compression option
+ * @param compress true to gzip compress transactions
+ * @return this builder
+ */
+ public PublisherBuilder enableCompresion ( boolean compress ) {
+ fCompress = compress;
+ return this;
+ }
+
+ /**
+ * Set the API key and secret for this client.
+ * @param apiKey
+ * @param apiSecret
+ * @return this builder
+ */
+ public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) {
+ fApiKey = apiKey;
+ fApiSecret = apiSecret;
+ return this;
+ }
+
+ /**
+ * Build the publisher
+ * @return a batching publisher
+ */
+ public MRBatchingPublisher build ()
+ {
+ if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
+ {
+ throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ }
+
+ if ( sfPublisherMock != null ) return sfPublisherMock;
+
+ final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
+ againstUrls ( fHosts ).
+ onTopic ( fTopic ).
+ batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
+ compress ( fCompress ).
+ build ();
+ if ( fApiKey != null )
+ {
+ pub.setApiCredentials ( fApiKey, fApiSecret );
+ }
+ return pub;
+ }
+
+ private Collection<String> fHosts = null;
+ private String fTopic = null;
+ private int fMaxBatchSize = 1;
+ private int fMaxBatchAgeMs = 1;
+ private boolean fCompress = false;
+ private String fApiKey = null;
+ private String fApiSecret = null;
+ }
+
+ /**
+ * A builder for an identity manager
+ * @author author
+ */
+ public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
+ {
+ /**
+ * Construct an identity manager builder.
+ */
+ public IdentityManagerBuilder () {}
+
+ @Override
+ protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
+ return new MRMetaClient ( hosts );
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ } }
+ }
+
+ /**
+ * A builder for a topic manager
+ * @author author
+ */
+ public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
+ {
+ /**
+ * Construct an topic manager builder.
+ */
+ public TopicManagerBuilder () {}
+
+ @Override
+ protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
+ return new MRMetaClient ( hosts );
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ } }
+ }
+
+ /**
+ * Inject a consumer. Used to support unit tests.
+ * @param cc
+ */
+ public static void $testInject ( MRConsumer cc )
+ {
+ sfConsumerMock = cc;
+ }
+
+ /**
+ * Inject a publisher. Used to support unit tests.
+ * @param pub
+ */
+ public static void $testInject ( MRBatchingPublisher pub )
+ {
+ sfPublisherMock = pub;
+ }
+
+ static MRConsumer sfConsumerMock = null;
+ static MRBatchingPublisher sfPublisherMock = null;
+
+ /**
+ * A builder for an identity manager
+ * @author author
+ */
+ public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
+ {
+ /**
+ * Construct an identity manager builder.
+ */
+ public AbstractAuthenticatedManagerBuilder () {}
+
+ /**
+ * Set the host list
+ * @param hostList a comma-separated list of hosts to use to connect to MR
+ * @return this builder
+ */
+ public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) {
+ return usingHosts ( MRConsumerImpl.stringToList(hostList) );
+ }
+
+ /**
+ * Set the host list
+ * @param hostSet a set of hosts to use to connect to MR
+ * @return this builder
+ */
+ public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) {
+ fHosts = hostSet;
+ return this;
+ }
+
+ /**
+ * Set the API key and secret for this client.
+ * @param apiKey
+ * @param apiSecret
+ * @return this builder
+ */
+ public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) {
+ fApiKey = apiKey;
+ fApiSecret = apiSecret;
+ return this;
+ }
+
+ /**
+ * Build the consumer
+ * @return a consumer
+ */
+ public T build ()
+ {
+ if ( fHosts.isEmpty() )
+ {
+ throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ }
+
+ final T mgr = constructClient ( fHosts );
+ mgr.setApiCredentials ( fApiKey, fApiSecret );
+ return mgr;
+ }
+
+ protected abstract T constructClient ( Collection<String> hosts );
+
+ private Collection<String> fHosts = null;
+ private String fApiKey = null;
+ private String fApiSecret = null;
+ }
+
+ private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java
new file mode 100644
index 0000000..dc9d555
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java
@@ -0,0 +1,691 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
+import org.onap.dmaap.mr.client.impl.MRMetaClient;
+import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+import org.onap.dmaap.mr.tools.ValidatorUtil;
+
+/**
+ * A factory for MR clients.<br/>
+ * <br/>
+ * Use caution selecting a consumer creator factory. If the call doesn't accept
+ * a consumer group name, then it creates a consumer that is not restartable.
+ * That is, if you stop your process and start it again, your client will NOT
+ * receive any missed messages on the topic. If you need to ensure receipt of
+ * missed messages, then you must use a consumer that's created with a group
+ * name and ID. (If you create multiple consumer processes using the same group,
+ * load is split across them. Be sure to use a different ID for each
+ * instance.)<br/>
+ * <br/>
+ * Publishers
+ *
+ * @author author
+ */
+public class MRClientFactory {
+ private static final String AUTH_KEY = "authKey";
+ private static final String AUTH_DATE = "authDate";
+ private static final String PASSWORD = "password";
+ private static final String USERNAME = "username";
+ private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
+ private static final String TOPIC = "topic";
+ private static final String TRANSPORT_TYPE = "TransportType";
+ public static MultivaluedMap<String, Object> HTTPHeadersMap;
+ public static Map<String, String> DME2HeadersMap;
+ public static String routeFilePath;
+
+ public static FileReader routeReader;
+
+ public static FileWriter routeWriter = null;
+ public static Properties prop = null;
+
+ /**
+ * Instantiates MRClientFactory.
+ */
+ private MRClientFactory() {
+ //prevents instantiation.
+ }
+
+ /**
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
+ *
+ * @param hostList
+ * A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default). For example,
+ * "hostname:8080,"
+ *
+ * @param topic
+ * The topic to consume
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(String hostList, String topic) {
+ return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
+ return createConsumer(hostSet, topic, null);
+ }
+
+ /**
+ * Create a consumer instance with server-side filtering, the default
+ * timeout, and no limit on messages returned. This consumer operates as an
+ * independent consumer (i.e., not in a group) and is NOT re-startable
+ * across sessions.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param filter
+ * a filter to use on the server side
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
+ return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostList
+ * A comma separated list of hosts to use to connect to MR. You
+ * can include port numbers (3904 is the default). For example,
+ * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
+ filter, apiKey, apiSecret);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostSet
+ * The host used in the URL to MR. Entries can be "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+ if (MRClientBuilders.sfConsumerMock != null)
+ return MRClientBuilders.sfConsumerMock;
+ try {
+ return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
+ apiSecret);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /*************************************************************************/
+ /*************************************************************************/
+ /*************************************************************************/
+
+ /**
+ * Create a publisher that sends each message (or group of messages)
+ * immediately. Most applications should favor higher latency for much
+ * higher message throughput and the "simple publisher" is not a good
+ * choice.
+ *
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
+ return createBatchingPublisher(hostlist, topic, 1, 1);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown. Message payloads are
+ * not compressed.
+ *
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+ long maxAgeMs) {
+ return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param hostlist
+ * The host used in the URL to MR. Can be "host:port", can be
+ * multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+ long maxAgeMs, boolean compress) {
+ return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
+ long maxAgeMs, boolean compress) {
+ final TreeSet<String> hosts = new TreeSet<String>();
+ for (String hp : hostSet) {
+ hosts.add(hp);
+ }
+ return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
+ int maxBatchSize, long maxAgeMs, boolean compress) {
+ return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+ .compress(compress).build();
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param host
+ * A host to be used in the URL to MR. Can be "host:port". Use
+ * multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param username
+ * username
+ * @param password
+ * password
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ * @param protocolFlag
+ * http auth or ueb auth or dme2 method
+ * @param producerFilePath
+ * all properties for publisher
+ * @return MRBatchingPublisher obj
+ */
+ public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
+ final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
+ MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+ .compress(compress).build();
+
+ pub.setHost(host);
+ pub.setUsername(username);
+ pub.setPassword(password);
+ pub.setProtocolFlag(protocolFlag);
+ return pub;
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param Properties
+ * props set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ return createInternalBatchingPublisher(props, withResponse);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param Properties
+ * props set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(Properties props)
+ throws FileNotFoundException, IOException {
+ return createInternalBatchingPublisher(props, false);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown
+ *
+ * @param producerFilePath
+ * set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
+ throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(producerFilePath));
+ Properties props = new Properties();
+ props.load(reader);
+ return createBatchingPublisher(props);
+ }
+
+ /**
+ * Create a publisher that will contain send methods that return response
+ * object to user.
+ *
+ * @param producerFilePath
+ * set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException
+ * exc
+ * @throws IOException
+ * ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(producerFilePath));
+ Properties props = new Properties();
+ props.load(reader);
+ return createBatchingPublisher(props, withResponse);
+ }
+
+ protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
+ throws FileNotFoundException, IOException {
+ assert props != null;
+ MRSimplerBatchPublisher pub;
+ if (withResponse) {
+ pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .onTopic(props.getProperty(TOPIC))
+ .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+ Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+ .compress(Boolean.parseBoolean(props.getProperty("compress")))
+ .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
+ .withResponse(withResponse).build();
+ } else {
+ pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .onTopic(props.getProperty(TOPIC))
+ .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+ Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+ .compress(Boolean.parseBoolean(props.getProperty("compress")))
+ .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
+ }
+ pub.setHost(props.getProperty("host"));
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+
+ pub.setAuthKey(props.getProperty(AUTH_KEY));
+ pub.setAuthDate(props.getProperty(AUTH_DATE));
+ pub.setUsername(props.getProperty(USERNAME));
+ pub.setPassword(props.getProperty(PASSWORD));
+ } else {
+ pub.setUsername(props.getProperty(USERNAME));
+ pub.setPassword(props.getProperty(PASSWORD));
+ }
+ pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
+ pub.setProps(props);
+ prop = new Properties();
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+ routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
+ routeReader = new FileReader(new File(routeFilePath));
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
+ }
+ }
+ return pub;
+ }
+
+ /**
+ * Create an identity manager client to work with API keys.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
+ * @return an identity manager
+ */
+ public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
+ MRIdentityManager cim;
+ try {
+ cim = new MRMetaClient(hostSet);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ cim.setApiCredentials(apiKey, apiSecret);
+ return cim;
+ }
+
+ /**
+ * Create a topic manager for working with topics.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to MR. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
+ * @return a topic manager
+ */
+ public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
+ MRMetaClient tmi;
+ try {
+ tmi = new MRMetaClient(hostSet);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ tmi.setApiCredentials(apiKey, apiSecret);
+ return tmi;
+ }
+
+ /**
+ * Inject a consumer. Used to support unit tests.
+ *
+ * @param cc
+ */
+ public static void $testInject(MRConsumer cc) {
+ MRClientBuilders.sfConsumerMock = cc;
+ }
+
+ public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+ String id, int i, int j, String protocalFlag, String consumerFilePath) {
+
+ MRConsumerImpl sub;
+ try {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ sub.setUsername(username);
+ sub.setPassword(password);
+ sub.setHost(host);
+ sub.setProtocolFlag(protocalFlag);
+ sub.setConsumerFilePath(consumerFilePath);
+ return sub;
+
+ }
+
+ public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+ String id, String protocalFlag, String consumerFilePath, int i, int j) {
+
+ MRConsumerImpl sub;
+ try {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ sub.setUsername(username);
+ sub.setPassword(password);
+ sub.setHost(host);
+ sub.setProtocolFlag(protocalFlag);
+ sub.setConsumerFilePath(consumerFilePath);
+ return sub;
+
+ }
+
+ public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
+ FileReader reader = new FileReader(new File(consumerFilePath));
+ Properties props = new Properties();
+ props.load(reader);
+
+ return createConsumer(props);
+ }
+
+ public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
+ int timeout;
+ ValidatorUtil.validateSubscriber(props);
+ if (props.getProperty("timeout") != null)
+ timeout = Integer.parseInt(props.getProperty("timeout"));
+ else
+ timeout = -1;
+ int limit;
+ if (props.getProperty("limit") != null)
+ limit = Integer.parseInt(props.getProperty("limit"));
+ else
+ limit = -1;
+ String group;
+ if (props.getProperty("group") == null)
+ group = UUID.randomUUID().toString();
+ else
+ group = props.getProperty("group");
+ MRConsumerImpl sub = null;
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
+ group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+ props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE));
+ sub.setAuthKey(props.getProperty(AUTH_KEY));
+ sub.setAuthDate(props.getProperty(AUTH_DATE));
+ sub.setUsername(props.getProperty(USERNAME));
+ sub.setPassword(props.getProperty(PASSWORD));
+ } else {
+ sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
+ group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+ props.getProperty(USERNAME), props.getProperty(PASSWORD));
+ sub.setUsername(props.getProperty(USERNAME));
+ sub.setPassword(props.getProperty(PASSWORD));
+ }
+
+ sub.setProps(props);
+ sub.setHost(props.getProperty("host"));
+ sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
+ sub.setfFilter(props.getProperty("filter"));
+ if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+ MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
+ routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
+ routeReader = new FileReader(new File(routeFilePath));
+ prop = new Properties();
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
+ }
+ }
+
+ return sub;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRConsumer.java b/src/main/java/org/onap/dmaap/mr/client/MRConsumer.java
new file mode 100644
index 0000000..b2f7563
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/MRConsumer.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client;
+
+import java.io.IOException;
+
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+
+public interface MRConsumer extends MRClient
+{
+ /**
+ * Fetch a set of messages. The consumer's timeout and message limit are used if set in the constructor call.
+
+ * @return a set of messages
+ * @throws IOException
+ */
+ Iterable<String> fetch () throws IOException, Exception;
+
+ /**
+ * Fetch a set of messages with an explicit timeout and limit for this call. These values
+ * override any set in the constructor call.
+ *
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection
+ * open while waiting for message traffic. Use -1 for default timeout (controlled on the server-side).
+ * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+ * @return a set messages
+ * @throws IOException if there's a problem connecting to the server
+ */
+ Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException, Exception;
+
+ MRConsumerResponse fetchWithReturnConsumerResponse ();
+
+
+ MRConsumerResponse fetchWithReturnConsumerResponse ( int timeoutMs, int limit );
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRIdentityManager.java b/src/main/java/org/onap/dmaap/mr/client/MRIdentityManager.java
new file mode 100644
index 0000000..1905f82
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/MRIdentityManager.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client;
+
+import java.io.IOException;
+
+import com.att.nsa.apiClient.credentials.ApiCredential;
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
+
+/**
+ * A client for manipulating API keys.
+ * @author author
+ *
+ */
+public interface MRIdentityManager extends MRClient
+{
+ /**
+ * An API Key record
+ */
+ public interface ApiKey
+ {
+ /**
+ * Get the email address associated with the API key
+ * @return the email address on the API key or null
+ */
+ String getEmail ();
+
+ /**
+ * Get the description associated with the API key
+ * @return the description on the API key or null
+ */
+ String getDescription ();
+ }
+
+ /**
+ * Create a new API key on the UEB cluster. The returned credential instance
+ * contains the new API key and API secret. This is the only time the secret
+ * is available to the client -- there's no API for retrieving it later -- so
+ * your application must store it securely.
+ *
+ * @param email
+ * @param description
+ * @return a new credential
+ * @throws HttpException
+ * @throws MRApiException
+ * @throws IOException
+ */
+ ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException;
+
+ /**
+ * Get basic info about a known API key
+ * @param apiKey
+ * @return the API key's info or null if it doesn't exist
+ * @throws HttpObjectNotFoundException, HttpException, MRApiException
+ * @throws IOException
+ */
+ ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, MRApiException, IOException;
+
+ /**
+ * Update the record for the API key used to authenticate this request. The UEB
+ * API requires that you authenticate with the same key you're updating, so the
+ * API key being changed is the one used for setApiCredentials.
+ *
+ * @param email use null to keep the current value
+ * @param description use null to keep the current value
+ * @throws IOException
+ * @throws HttpException
+ * @throws HttpObjectNotFoundException
+ */
+ void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException;
+
+ /**
+ * Delete the *current* API key. After this call returns, the API key
+ * used to authenticate will no longer be valid.
+ *
+ * @throws IOException
+ * @throws HttpException
+ */
+ void deleteCurrentApiKey () throws HttpException, IOException;
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRPublisher.java b/src/main/java/org/onap/dmaap/mr/client/MRPublisher.java
new file mode 100644
index 0000000..e900229
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/MRPublisher.java
@@ -0,0 +1,93 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A MR publishing interface.
+ *
+ */
+public interface MRPublisher extends MRClient
+{
+ /**
+ * A simple message container
+ */
+ public static class message
+ {
+ public message ( String partition, String msg )
+ {
+ fPartition = partition == null ? "" : partition;
+ fMsg = msg;
+ if ( fMsg == null )
+ {
+ throw new IllegalArgumentException ( "Can't send a null message." );
+ }
+ }
+
+ public message ( message msg )
+ {
+ this ( msg.fPartition, msg.fMsg );
+ }
+
+ public final String fPartition;
+ public final String fMsg;
+ }
+
+ /**
+ * Send the given message without partition. partition will be placed at HTTP request level.
+ * @param msg message to sent
+ * @return the number of pending messages
+ * @throws IOException exception
+ */
+ int send ( String msg ) throws IOException;
+ /**
+ * Send the given message using the given partition.
+ * @param partition partition
+ * @param msg message
+ * @return the number of pending messages
+ * @throws IOException exception
+ */
+ int send ( String partition, String msg ) throws IOException;
+
+ /**
+ * Send the given message using its partition.
+ * @param msg mesg
+ * @return the number of pending messages
+ * @throws IOException exp
+ */
+ int send ( message msg ) throws IOException;
+
+ /**
+ * Send the given messages using their partitions.
+ * @param msgs msg
+ * @return the number of pending messages
+ * @throws IOException exp
+ */
+ int send ( Collection<message> msgs ) throws IOException;
+
+ /**
+ * Close this publisher. It's an error to call send() after close()
+ */
+ void close ();
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRTopicManager.java b/src/main/java/org/onap/dmaap/mr/client/MRTopicManager.java
new file mode 100644
index 0000000..54ca7ec
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/MRTopicManager.java
@@ -0,0 +1,183 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client;
+
+import java.io.IOException;
+import java.util.Set;
+
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
+
+
+/**
+ * A client for working with topic metadata.
+ * @author author
+ */
+public interface MRTopicManager extends MRClient
+{
+ /**
+ * Get the topics available in the cluster
+ * @return a set of topic names
+ * @throws IOException
+ */
+ Set<String> getTopics () throws IOException;
+
+ /**
+ * Information about a topic.
+ */
+ public interface TopicInfo
+ {
+ /**
+ * Get the owner of the topic
+ * @return the owner, or null if no entry
+ */
+ String getOwner ();
+
+ /**
+ * Get the description for this topic
+ * @return the description, or null if no entry
+ */
+ String getDescription ();
+
+ /**
+ * Get the set of allowed producers (as API keys) on this topic
+ * @return the set of allowed producers, null of no ACL exists/enabled
+ */
+ Set<String> getAllowedProducers ();
+
+ /**
+ * Get the set of allowed consumers (as API keys) on this topic
+ * @return the set of allowed consumers, null of no ACL exists/enabled
+ */
+ Set<String> getAllowedConsumers ();
+ }
+
+ /**
+ * Get information about a topic.
+ * @param topic
+ * @return topic information
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Create a new topic.
+ * @param topicName
+ * @param topicDescription
+ * @param partitionCount
+ * @param replicationCount
+ * @throws HttpException
+ * @throws IOException
+ */
+ void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException;
+
+ /**
+ * Delete the topic. This call must be authenticated and the API key listed as owner on the topic.
+ * NOTE: The MR (UEB) API server does not support topic deletion at this time (mid 2015)
+ * @param topic
+ * @throws HttpException
+ * @throws IOException
+ * @deprecated If/when the Kafka system supports topic delete, or the implementation changes, this will be restored.
+ */
+ @Deprecated
+ void deleteTopic ( String topic ) throws HttpException, IOException;
+
+ /**
+ * Can any client produce events into this topic without authentication?
+ * @param topic
+ * @return true if the topic is open for producing
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Get the set of allowed producers. If the topic is open, the result is null.
+ * @param topic
+ * @return a set of allowed producers or null
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Allow the given API key to produce messages on the given topic. The caller must
+ * own this topic.
+ * @param topic
+ * @param apiKey
+ * @throws HttpException
+ * @throws HttpObjectNotFoundException
+ * @throws IOException
+ */
+ void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException;
+
+ /**
+ * Revoke the given API key's authorization to produce messages on the given topic.
+ * The caller must own this topic.
+ * @param topic
+ * @param apiKey
+ * @throws HttpException
+ * @throws IOException
+ */
+ void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException;
+
+ /**
+ * Can any client consume events from this topic without authentication?
+ * @param topic
+ * @return true if the topic is open for consuming
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Get the set of allowed consumers. If the topic is open, the result is null.
+ * @param topic
+ * @return a set of allowed consumers or null
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Allow the given API key to consume messages on the given topic. The caller must
+ * own this topic.
+ * @param topic
+ * @param apiKey
+ * @throws HttpException
+ * @throws HttpObjectNotFoundException
+ * @throws IOException
+ */
+ void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException;
+
+ /**
+ * Revoke the given API key's authorization to consume messages on the given topic.
+ * The caller must own this topic.
+ * @param topic
+ * @param apiKey
+ * @throws HttpException
+ * @throws IOException
+ */
+ void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException;
+}
+
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java b/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java
new file mode 100644
index 0000000..6670399
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+public class Clock
+{
+ public synchronized static Clock getIt ()
+ {
+ if ( sfClock == null )
+ {
+ sfClock = new Clock ();
+ }
+ return sfClock;
+ }
+
+ /**
+ * Get the system's current time in milliseconds.
+ * @return the current time
+ */
+ public static long now ()
+ {
+ return getIt().nowImpl ();
+ }
+
+ /**
+ * Get current time in milliseconds
+ * @return current time in ms
+ */
+ protected long nowImpl ()
+ {
+ return System.currentTimeMillis ();
+ }
+
+ protected Clock ()
+ {
+ }
+
+ private static Clock sfClock = null;
+
+ protected synchronized static void register ( Clock testClock )
+ {
+ sfClock = testClock;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java b/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java
new file mode 100644
index 0000000..b06290a
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
+import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
+
+public class DmaapClientUtil {
+
+ private static final String MR_AUTH_CONSTANT = "X-CambriaAuth";
+ private static final String MR_DATE_CONSTANT = "X-CambriaDate";
+
+ public static WebTarget getTarget(final String path, final String username, final String password) {
+
+ Client client = ClientBuilder.newClient();
+ HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
+ client.register(feature);
+
+ return client.target(path);
+ }
+
+ public static WebTarget getTarget(final String path) {
+
+ Client client = ClientBuilder.newClient();
+ return client.target(path);
+ }
+
+ public static Response getResponsewtCambriaAuth(WebTarget target, String username, String password) {
+ return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
+
+ }
+
+ public static Response postResponsewtCambriaAuth(WebTarget target, String username, String password,byte[] data, String contentType) {
+ return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).post(Entity.entity(data, contentType));
+
+ }
+
+ public static Response getResponsewtBasicAuth(WebTarget target, String authHeader) {
+
+ return target.request().header("Authorization", "Basic " + authHeader).get();
+
+ }
+
+ public static Response postResponsewtBasicAuth(WebTarget target, String authHeader,byte[] data,String contentType) {
+
+ return target.request().header("Authorization", "Basic " + authHeader).post(Entity.entity(data, contentType));
+
+ }
+
+ public static Response getResponsewtNoAuth(WebTarget target) {
+
+ return target.request().get();
+
+ }
+
+ public static Response postResponsewtNoAuth(WebTarget target, byte[] data, String contentType) {
+ return target.request().post(Entity.entity(data, contentType));
+
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java
new file mode 100644
index 0000000..ecea21c
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java
@@ -0,0 +1,395 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
+import org.apache.http.HttpException;
+import org.glassfish.jersey.internal.util.Base64;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.nsa.apiClient.http.CacheUse;
+import com.att.nsa.apiClient.http.HttpClient;
+import org.onap.dmaap.mr.client.MRClient;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+
+public class MRBaseClient extends HttpClient implements MRClient {
+
+
+ protected MRBaseClient(Collection<String> hosts) throws MalformedURLException {
+ super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort);
+
+ fLog = LoggerFactory.getLogger(this.getClass().getName());
+ }
+
+ protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException {
+ super(ConnectionType.HTTP, hosts, stdSvcPort);
+
+ fLog = LoggerFactory.getLogger(this.getClass().getName());
+ }
+
+ protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException {
+ super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L,
+ TimeUnit.MILLISECONDS, 32, 32, 600000);
+
+ fLog = LoggerFactory.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void close() {
+ }
+
+ protected Set<String> jsonArrayToSet(JSONArray a) {
+ if (a == null)
+ return null;
+
+ final TreeSet<String> set = new TreeSet<String>();
+ for (int i = 0; i < a.length(); i++) {
+ set.add(a.getString(i));
+ }
+ return set;
+ }
+
+ public void logTo(Logger log) {
+ fLog = log;
+ replaceLogger(log);
+ }
+
+ protected Logger getLog() {
+ return fLog;
+ }
+
+ private Logger fLog;
+
+ public JSONObject post(final String path, final byte[] data, final String contentType, final String username,
+ final String password, final String protocalFlag) throws HttpException, JSONException {
+ if ((null != username && null != password)) {
+ WebTarget target=null;
+ Response response=null;
+ target = DmaapClientUtil.getTarget(path, username, password);
+ String encoding = Base64.encodeAsString(username + ":" + password);
+
+ response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding,data, contentType);
+
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public JSONObject postNoAuth(final String path, final byte[] data, String contentType)
+ throws HttpException, JSONException {
+ WebTarget target = null;
+ Response response = null;
+ if (contentType == null) {
+ contentType = "text/pain";
+ }
+ target = DmaapClientUtil.getTarget(path);
+
+ response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType);
+
+ return getResponseDataInJson(response);
+ }
+
+ public String postWithResponse(final String path, final byte[] data, final String contentType,
+ final String username, final String password, final String protocolFlag)
+ throws HttpException, JSONException {
+ String responseData = null;
+ if ((null != username && null != password)) {
+ WebTarget target=null;
+ Response response=null;
+ target = DmaapClientUtil.getTarget(path, username, password);
+ String encoding = Base64.encodeAsString(username + ":" + password);
+
+ response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding,data, contentType);
+
+ responseData = (String)response.readEntity(String.class);
+ return responseData;
+ } else {
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public String postNoAuthWithResponse(final String path, final byte[] data, String contentType)
+ throws HttpException, JSONException {
+
+ String responseData = null;
+ WebTarget target = null;
+ Response response = null;
+ if (contentType == null) {
+ contentType = "text/pain";
+ }
+ target = DmaapClientUtil.getTarget(path);
+
+ response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType);
+ responseData = (String) response.readEntity(String.class);
+ return responseData;
+ }
+
+ public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,
+ final String authDate, final String username, final String password, final String protocolFlag)
+ throws HttpException, JSONException {
+ if ((null != username && null != password)) {
+ WebTarget target=null;
+ Response response=null;
+ target = DmaapClientUtil.getTarget(path, username, password);
+ response =DmaapClientUtil.postResponsewtCambriaAuth(target, authKey, authDate, data, contentType);
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public String postAuthwithResponse(final String path, final byte[] data, final String contentType,
+ final String authKey, final String authDate, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ if ((null != username && null != password)) {
+ WebTarget target=null;
+ Response response=null;
+ target = DmaapClientUtil.getTarget(path, username, password);
+ response = DmaapClientUtil.postResponsewtCambriaAuth(target, authKey, authDate, data, contentType);
+ responseData = (String)response.readEntity(String.class);
+ return responseData;
+
+ } else {
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public JSONObject get(final String path, final String username, final String password, final String protocolFlag)
+ throws HttpException, JSONException {
+ if (null != username && null != password) {
+
+ WebTarget target=null;
+ Response response=null;
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ target = DmaapClientUtil.getTarget(path);
+ response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password);
+ } else {
+ target = DmaapClientUtil.getTarget(path, username, password);
+ String encoding = Base64.encodeAsString(username + ":" + password);
+
+ response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding);
+
+ }
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public String getResponse(final String path, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ if (null != username && null != password) {
+ WebTarget target=null;
+ Response response=null;
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ target = DmaapClientUtil.getTarget(path);
+ response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password);
+ } else {
+ target = DmaapClientUtil.getTarget(path, username, password);
+ String encoding = Base64.encodeAsString(username + ":" + password);
+ response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding);
+ }
+ MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+ String transactionid = response.getHeaderString("transactionid");
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
+ }
+
+ responseData = (String)response.readEntity(String.class);
+ return responseData;
+ } else {
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username,
+ final String password, final String protocolFlag) throws HttpException, JSONException {
+ if (null != username && null != password) {
+ WebTarget target=null;
+ Response response=null;
+ target = DmaapClientUtil.getTarget(path, username, password);
+ response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate);
+
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public JSONObject getNoAuth(final String path) throws HttpException, JSONException {
+
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(path);
+ response = DmaapClientUtil.getResponsewtNoAuth(target);
+
+ return getResponseDataInJson(response);
+ }
+
+ public String getAuthResponse(final String path, final String authKey, final String authDate, final String username,
+ final String password, final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ if (null != username && null != password) {
+ WebTarget target=null;
+ Response response=null;
+ target = DmaapClientUtil.getTarget(path, username, password);
+ response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate);
+
+ MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+ String transactionid = response.getHeaderString("transactionid");
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
+ }
+
+ responseData = (String)response.readEntity(String.class);
+ return responseData;
+ } else {
+ throw new HttpException(
+ "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public String getNoAuthResponse(String path, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ WebTarget target=null;
+ Response response=null;
+ target = DmaapClientUtil.getTarget(path, username, password);
+ response = DmaapClientUtil.getResponsewtNoAuth(target);
+
+ MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+ String transactionid = response.getHeaderString("transactionid");
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
+ }
+
+ responseData = (String)response.readEntity(String.class);
+ return responseData;
+
+ }
+
+
+ private JSONObject getResponseDataInJson(Response response) throws JSONException {
+ try {
+ MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+
+ // MultivaluedMap<String, Object> headersMap =
+ // for(String key : headersMap.keySet()) {
+ String transactionid = response.getHeaderString("transactionid");
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
+ }
+
+
+ if (response.getStatus() == 403) {
+ JSONObject jsonObject = null;
+ jsonObject = new JSONObject();
+ JSONArray jsonArray = new JSONArray();
+ jsonArray.put(response.getEntity());
+ jsonObject.put("result", jsonArray);
+ jsonObject.put("status", response.getStatus());
+ return jsonObject;
+ }
+ String responseData = (String)response.readEntity(String.class);
+
+ JSONTokener jsonTokener = new JSONTokener(responseData);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ jsonObject.put("status", response.getStatus());
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ jsonObject.put("status", response.getStatus());
+ }
+
+ return jsonObject;
+ } catch (JSONException excp) {
+ fLog.error("DMAAP - Error reading response data.", excp);
+ return null;
+ }
+
+ }
+
+ public String getHTTPErrorResponseMessage(String responseString) {
+
+ String response = null;
+ int beginIndex = 0;
+ int endIndex = 0;
+ if (responseString.contains("<body>")) {
+
+ beginIndex = responseString.indexOf("body>") + 5;
+ endIndex = responseString.indexOf("</body");
+ response = responseString.substring(beginIndex, endIndex);
+ }
+
+ return response;
+
+ }
+
+ public String getHTTPErrorResponseCode(String responseString) {
+
+ String response = null;
+ int beginIndex = 0;
+ int endIndex = 0;
+ if (responseString.contains("<title>")) {
+ beginIndex = responseString.indexOf("title>") + 6;
+ endIndex = responseString.indexOf("</title");
+ response = responseString.substring(beginIndex, endIndex);
+ }
+
+ return response;
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java
new file mode 100644
index 0000000..bcd4403
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java
@@ -0,0 +1,487 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.zip.GZIPOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.nsa.apiClient.http.HttpClient;
+import com.att.nsa.apiClient.http.HttpException;
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.response.MRPublisherResponse;
+
+/**
+ * This is a batching publisher class that allows the client to publish messages
+ * in batches that are limited in terms of size and/or hold time.
+ *
+ * @author author
+ * @deprecated This class's tricky locking doesn't quite work
+ *
+ */
+@Deprecated
+public class MRBatchPublisher implements MRBatchingPublisher
+{
+ public static final long kMinMaxAgeMs = 1;
+
+ /**
+ * Create a batch publisher.
+ *
+ * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
+ * @param topic the topic to publish to
+ * @param maxBatchSize the maximum size of a batch
+ * @param maxAgeMs the maximum age of a batch
+ */
+ public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
+ {
+ if ( maxAgeMs < kMinMaxAgeMs )
+ {
+ fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs );
+ maxAgeMs = kMinMaxAgeMs;
+ }
+
+ try {
+ fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress );
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException (e);
+ }
+
+ // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
+ // the oldest msg to hit max age? (locking is complicated, but should be do-able)
+ fExec = new ScheduledThreadPoolExecutor ( 1 );
+ fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS );
+ }
+
+ @Override
+ public void setApiCredentials ( String apiKey, String apiSecret )
+ {
+ fSender.setApiCredentials ( apiKey, apiSecret );
+ }
+
+ @Override
+ public void clearApiCredentials ()
+ {
+ fSender.clearApiCredentials ();
+ }
+
+ /**
+ * Send the given message with the given partition
+ * @param partition
+ * @param msg
+ * @throws IOException
+ */
+ @Override
+ public int send ( String partition, String msg ) throws IOException
+ {
+ return send ( new message ( partition, msg ) );
+ }
+ @Override
+ public int send ( String msg ) throws IOException
+ {
+ return send ( new message ( "",msg ) );
+ }
+ /**
+ * Send the given message
+ * @param userMsg a message
+ * @throws IOException
+ */
+ @Override
+ public int send ( message userMsg ) throws IOException
+ {
+ final LinkedList<message> list = new LinkedList<message> ();
+ list.add ( userMsg );
+ return send ( list );
+ }
+
+ /**
+ * Send the given set of messages
+ * @param msgs the set of messages, sent in order of iteration
+ * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
+ * @throws IOException
+ */
+ @Override
+ public int send ( Collection<message> msgs ) throws IOException
+ {
+ if ( msgs.isEmpty() )
+ {
+ fSender.queue ( msgs );
+ }
+ return fSender.size ();
+ }
+
+ @Override
+ public int getPendingMessageCount ()
+ {
+ return fSender.size ();
+ }
+
+ /**
+ * Send any pending messages and close this publisher.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public void close ()
+ {
+ try
+ {
+ final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
+ if ( remains.isEmpty() )
+ {
+ fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. "
+ + "(Consider using the alternate close method to capture unsent messages in this case.)" );
+ }
+ }
+ catch ( InterruptedException e )
+ {
+ fLog.warn ( "Possible message loss. " + e.getMessage(), e );
+ Thread.currentThread().interrupt();
+ }
+ catch ( IOException e )
+ {
+ fLog.warn ( "Possible message loss. " + e.getMessage(), e );
+ }
+ }
+
+ public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException
+ {
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
+ fExec.shutdown ();
+
+ final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
+ final long timeoutAtMs = System.currentTimeMillis () + waitInMs;
+ while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 )
+ {
+ fSender.checkSend ( true );
+ Thread.sleep ( 250 );
+ }
+
+ final LinkedList<message> result = new LinkedList<message> ();
+ fSender.drainTo ( result );
+ return result;
+ }
+
+ private final ScheduledThreadPoolExecutor fExec;
+ private final Sender fSender;
+
+ private static class TimestampedMessage extends message
+ {
+ public TimestampedMessage ( message m )
+ {
+ super ( m );
+ timestamp = System.currentTimeMillis ();
+ }
+ public final long timestamp;
+ }
+
+ private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class );
+
+ private class Sender extends MRBaseClient implements Runnable
+ {
+ public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException
+ {
+ super ( baseUrls );
+
+ fNextBatch = new LinkedList<TimestampedMessage> ();
+ fSendingBatch = null;
+ fTopic = topic;
+ fMaxBatchSize = maxBatch;
+ fMaxAgeMs = maxAgeMs;
+ fCompress = compress;
+ fLock = new ReentrantReadWriteLock ();
+ fWriteLock = fLock.writeLock ();
+ fReadLock = fLock.readLock ();
+ fDontSendUntilMs = 0;
+ }
+
+ public void drainTo ( LinkedList<message> list )
+ {
+ fWriteLock.lock ();
+ try
+ {
+ if ( fSendingBatch != null )
+ {
+ list.addAll ( fSendingBatch );
+ }
+ list.addAll ( fNextBatch );
+
+ fSendingBatch = null;
+ fNextBatch.clear ();
+ }
+ finally
+ {
+ fWriteLock.unlock ();
+ }
+ }
+
+ /**
+ * Called periodically by the background executor.
+ */
+ @Override
+ public void run ()
+ {
+ try
+ {
+ checkSend ( false );
+ }
+ catch ( IOException e )
+ {
+ fLog.warn ( "MR background send: " + e.getMessage () );
+ fLog.error( "IOException " + e );
+ }
+ }
+
+ public int size ()
+ {
+ fReadLock.lock ();
+ try
+ {
+ return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () );
+ }
+ finally
+ {
+ fReadLock.unlock ();
+ }
+ }
+
+ /**
+ * Called to queue a message.
+ * @param m
+ * @throws IOException
+ */
+ public void queue ( Collection<message> msgs ) throws IOException
+ {
+ fWriteLock.lock ();
+ try
+ {
+ for ( message userMsg : msgs )
+ {
+ if ( userMsg != null )
+ {
+ fNextBatch.add ( new TimestampedMessage ( userMsg ) );
+ }
+ else
+ {
+ fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." );
+ }
+ }
+ }
+ finally
+ {
+ fWriteLock.unlock();
+ }
+ checkSend ( false );
+ }
+
+ /**
+ * Send a batch if the queue is long enough, or the first pending message is old enough.
+ * @param force
+ * @throws IOException
+ */
+ public void checkSend ( boolean force ) throws IOException
+ {
+ // hold a read lock just long enough to evaluate whether a batch
+ // should be sent
+ boolean shouldSend = false;
+ fReadLock.lock ();
+ try
+ {
+ if ( fNextBatch.isEmpty() )
+ {
+ final long nowMs = System.currentTimeMillis ();
+ shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize );
+ if ( !shouldSend )
+ {
+ final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs;
+ shouldSend = sendAtMs <= nowMs;
+ }
+
+ // however, unless forced, wait after an error
+ shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs );
+ }
+ // else: even in 'force', there's nothing to send, so shouldSend=false is fine
+ }
+ finally
+ {
+ fReadLock.unlock ();
+ }
+
+ // if a send is required, acquire a write lock, swap out the next batch,
+ // swap in a fresh batch, and release the lock for the caller to start
+ // filling a batch again. After releasing the lock, send the current
+ // batch. (There could be more messages added between read unlock and
+ // write lock, but that's fine.)
+ if ( shouldSend )
+ {
+ fSendingBatch = null;
+
+ fWriteLock.lock ();
+ try
+ {
+ fSendingBatch = fNextBatch;
+ fNextBatch = new LinkedList<TimestampedMessage> ();
+ }
+ finally
+ {
+ fWriteLock.unlock ();
+ }
+
+ if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) )
+ {
+ fLog.warn ( "Send failed, rebuilding send queue." );
+
+ // note the time for back-off
+ fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis ();
+
+ // the send failed. reconstruct the pending queue
+ fWriteLock.lock ();
+ try
+ {
+ final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
+ fNextBatch = fSendingBatch;
+ fNextBatch.addAll ( nextGroup );
+ fSendingBatch = null;
+ fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." );
+ }
+ finally
+ {
+ fWriteLock.unlock ();
+ }
+ }
+ else
+ {
+ fWriteLock.lock ();
+ try
+ {
+ fSendingBatch = null;
+ }
+ finally
+ {
+ fWriteLock.unlock ();
+ }
+ }
+ }
+ }
+
+ private LinkedList<TimestampedMessage> fNextBatch;
+ private LinkedList<TimestampedMessage> fSendingBatch;
+ private final String fTopic;
+ private final int fMaxBatchSize;
+ private final long fMaxAgeMs;
+ private final boolean fCompress;
+ private final ReentrantReadWriteLock fLock;
+ private final WriteLock fWriteLock;
+ private final ReadLock fReadLock;
+ private long fDontSendUntilMs;
+ private static final long sfWaitAfterError = 1000;
+ }
+
+ // this is static so that it's clearly not using any mutable member data outside of a lock
+ private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log )
+ {
+ // it's possible for this call to be made with an empty list. in this case, just return.
+ if ( toSend.isEmpty() )
+ {
+ return true;
+ }
+
+ final long nowMs = System.currentTimeMillis ();
+ final String url = MRConstants.makeUrl ( topic );
+
+ log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms" );
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+ try
+ {
+ OutputStream os = baseStream;
+ if ( compress )
+ {
+ os = new GZIPOutputStream ( baseStream );
+ }
+ for ( TimestampedMessage m : toSend )
+ {
+ os.write ( ( "" + m.fPartition.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( ( "" + m.fMsg.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( m.fPartition.getBytes() );
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ os.close ();
+ }
+ catch ( IOException e )
+ {
+ log.warn ( "Problem writing stream to post: " + e.getMessage (),e );
+ return false;
+ }
+
+ boolean result = false;
+ final long startMs = System.currentTimeMillis ();
+ try
+ {
+ client.post ( url, compress ?
+ MRFormat.CAMBRIA_ZIP.toString () :
+ MRFormat.CAMBRIA.toString (),
+ baseStream.toByteArray(), false );
+ result = true;
+ }
+ catch ( HttpException e )
+ {
+ log.warn ( "Problem posting to MR: " + e.getMessage(),e );
+ }
+ catch ( IOException e )
+ {
+ log.warn ( "Problem posting to MR: " + e.getMessage(),e );
+ }
+
+ log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" );
+ return result;
+ }
+
+ @Override
+ public void logTo ( Logger log )
+ {
+ fLog = log;
+ }
+
+ @Override
+ public MRPublisherResponse sendBatchWithResponse() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java
new file mode 100644
index 0000000..ed23918
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java
@@ -0,0 +1,59 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MRClientVersionInfo
+{
+ private static final Logger logger = LoggerFactory.getLogger(MRClientVersionInfo.class);
+ public static String getVersion ()
+ {
+ return version;
+ }
+
+ private static final Properties props = new Properties();
+ private static final String version;
+ static
+ {
+ String use = null;
+ try
+ {
+ final InputStream is = MRClientVersionInfo.class.getResourceAsStream ( "/MRClientVersion.properties" );
+ if ( is != null )
+ {
+ props.load ( is );
+ use = props.getProperty ( "MRClientVersion", null );
+ }
+ }
+ catch ( IOException e )
+ {
+ logger.error("exception: ", e);
+ }
+ version = use;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java
new file mode 100644
index 0000000..f6d9578
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java
@@ -0,0 +1,179 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.http.HttpHost;
+
+class MRConstants
+{
+ private static final String PROTOCOL = "http";
+ public static final String context = "/";
+ public static final String kBasePath = "events/";
+ public static final int kStdMRServicePort = 8080;
+
+ public static String escape ( String s )
+ {
+ try
+ {
+ return URLEncoder.encode ( s, "UTF-8");
+ }
+ catch ( UnsupportedEncodingException e )
+ {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public static String makeUrl ( String rawTopic )
+ {
+ final String cleanTopic = escape ( rawTopic );
+
+ final StringBuffer url = new StringBuffer().
+ append ( MRConstants.context ).
+ append ( MRConstants.kBasePath ).
+ append ( cleanTopic );
+ return url.toString ();
+ }
+
+ public static String makeUrl ( final String host, final String rawTopic )
+ {
+ final String cleanTopic = escape ( rawTopic );
+
+ final StringBuffer url = new StringBuffer();
+
+ if (!host.startsWith("http") || !host.startsWith("https") ) {
+ url.append( PROTOCOL + "://" );
+ }
+ url.append(host);
+ url.append ( MRConstants.context );
+ url.append ( MRConstants.kBasePath );
+ url.append ( cleanTopic );
+ return url.toString ();
+ }
+
+ public static String makeUrl ( final String host, final String rawTopic, final String transferprotocol,final String parttion )
+ {
+ final String cleanTopic = escape ( rawTopic );
+
+ final StringBuffer url = new StringBuffer();
+
+ if (transferprotocol !=null && !transferprotocol.equals("")) {
+ url.append( transferprotocol + "://" );
+ }else{
+ url.append( PROTOCOL + "://" );
+ }
+ url.append(host);
+ url.append ( MRConstants.context );
+ url.append ( MRConstants.kBasePath );
+ url.append ( cleanTopic );
+ if(parttion!=null && !parttion.equalsIgnoreCase(""))
+ url.append("?partitionKey=").append(parttion);
+ return url.toString ();
+ }
+ public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId )
+ {
+ final String cleanConsumerGroup = escape ( rawConsumerGroup );
+ final String cleanConsumerId = escape ( rawConsumerId );
+ return MRConstants.context + MRConstants.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId;
+ }
+
+ /**
+ * Create a list of HttpHosts from an input list of strings. Input strings have
+ * host[:port] as format. If the port section is not provided, the default port is used.
+ *
+ * @param hosts
+ * @return a list of hosts
+ */
+ public static List<HttpHost> createHostsList(Collection<String> hosts)
+ {
+ final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> ();
+ for ( String host : hosts )
+ {
+ if ( host.length () == 0 ) continue;
+ convertedHosts.add ( hostForString ( host ) );
+ }
+ return convertedHosts;
+ }
+
+ /**
+ * Return an HttpHost from an input string. Input string has
+ * host[:port] as format. If the port section is not provided, the default port is used.
+ *
+ * @param hosts
+ * @return a list of hosts
+ */
+ public static HttpHost hostForString ( String host )
+ {
+ if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." );
+
+ String hostPart = host;
+ int port = kStdMRServicePort;
+
+ final int colon = host.indexOf ( ':' );
+ if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." );
+ if ( colon > 0 )
+ {
+ hostPart = host.substring ( 0, colon ).trim();
+
+ final String portPart = host.substring ( colon + 1 ).trim();
+ if ( portPart.length () > 0 )
+ {
+ try
+ {
+ port = Integer.parseInt ( portPart );
+ }
+ catch ( NumberFormatException x )
+ {
+ throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x );
+ }
+ }
+ // else: use default port on "foo:"
+ }
+
+ return new HttpHost ( hostPart, port );
+ }
+
+ public static String makeConsumerUrl(String host, String fTopic, String fGroup, String fId,final String transferprotocol) {
+ final String cleanConsumerGroup = escape ( fGroup );
+ final String cleanConsumerId = escape ( fId );
+
+ StringBuffer url = new StringBuffer();
+
+ if (transferprotocol !=null && !transferprotocol.equals("")) {
+ url.append( transferprotocol + "://" );
+ }else{
+ url.append( PROTOCOL + "://" );
+ }
+
+ url.append(host);
+ url.append(context);
+ url.append(kBasePath);
+ url.append(fTopic + "/" + cleanConsumerGroup + "/" + cleanConsumerId);
+
+ return url.toString();
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
new file mode 100644
index 0000000..a1a52aa
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
@@ -0,0 +1,675 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+import com.att.aft.dme2.api.DME2Client;
+import com.att.aft.dme2.api.DME2Exception;
+import org.onap.dmaap.mr.client.HostSelector;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.http.HttpException;
+import org.apache.http.HttpStatus;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
+
+ private Logger log = LoggerFactory.getLogger(this.getClass().getName());
+
+ public static final String routerFilePath = null;
+
+ public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+ public String consumerFilePath;
+
+ private static final String SUCCESS_MESSAGE = "Success";
+ private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
+ private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
+
+ private final String fTopic;
+ private final String fGroup;
+ private final String fId;
+ private final int fTimeoutMs;
+ private final int fLimit;
+ private String fFilter;
+ private String username;
+ private String password;
+ private String host;
+ private HostSelector fHostSelector = null;
+ private String url;
+ private DME2Client sender;
+ private String authKey;
+ private String authDate;
+ private Properties props;
+ private HashMap<String, String> DMETimeOuts;
+ private long dme2ReplyHandlerTimeoutMs;
+ private long longPollingMs;
+
+ public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
+ String apiSecret_password) throws MalformedURLException {
+ this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
+ false);
+ }
+
+ public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
+ boolean allowSelfSignedCerts) throws MalformedURLException {
+ super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
+
+ fTopic = topic;
+ fGroup = consumerGroup;
+ fId = consumerId;
+ fTimeoutMs = timeoutMs;
+ fLimit = limit;
+ fFilter = filter;
+
+ fHostSelector = new HostSelector(hostPart);
+ }
+
+ @Override
+ public Iterable<String> fetch() throws IOException, Exception {
+ // fetch with the timeout and limit set in constructor
+ return fetch(fTimeoutMs, fLimit);
+ }
+
+ @Override
+ public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
+ final LinkedList<String> msgs = new LinkedList<>();
+
+ try {
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+ dmeConfigure(timeoutMs, limit);
+ try {
+ String reply = sender.sendAndWait(timeoutMs + 10000L);
+ final JSONObject o = getResponseDataInJson(reply);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ try {
+ final JSONObject o = get(urlPath, username, password, protocolFlag);
+
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+ timeoutMs, limit);
+
+ try {
+ final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ try {
+ final JSONObject o = getNoAuth(urlPath);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ } catch (Exception e) {
+ throw e;
+ }
+
+ return msgs;
+ }
+
+ @Override
+ public MRConsumerResponse fetchWithReturnConsumerResponse() {
+ // fetch with the timeout and limit set in constructor
+ return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
+ }
+
+ @Override
+ public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
+ final LinkedList<String> msgs = new LinkedList<String>();
+ MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
+ try {
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+ dmeConfigure(timeoutMs, limit);
+
+ long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
+ : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
+ String reply = sender.sendAndWait(timeout);
+
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
+
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ createMRConsumerResponse(reply, mrConsumerResponse);
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ String response = getResponse(urlPath, username, password, protocolFlag);
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ createMRConsumerResponse(response, mrConsumerResponse);
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+ timeoutMs, limit);
+
+ String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ createMRConsumerResponse(response, mrConsumerResponse);
+ }
+
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ createMRConsumerResponse(response, mrConsumerResponse);
+ }
+
+ } catch (JSONException e) {
+ mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ mrConsumerResponse.setResponseMessage(e.getMessage());
+ log.error("json exception: ", e);
+ } catch (HttpException e) {
+ mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ mrConsumerResponse.setResponseMessage(e.getMessage());
+ log.error("http exception: ", e);
+ } catch (DME2Exception e) {
+ mrConsumerResponse.setResponseCode(e.getErrorCode());
+ mrConsumerResponse.setResponseMessage(e.getErrorMessage());
+ log.error("DME2 exception: ", e);
+ } catch (Exception e) {
+ mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ mrConsumerResponse.setResponseMessage(e.getMessage());
+ log.error("exception: ", e);
+ }
+ mrConsumerResponse.setActualMessages(msgs);
+ return mrConsumerResponse;
+ }
+
+ @Override
+ protected void reportProblemWithResponse() {
+ log.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
+ super.reportProblemWithResponse();
+ fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
+ }
+
+ private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
+ if (reply.startsWith("{")) {
+ JSONObject jObject = new JSONObject(reply);
+ String message = jObject.getString("message");
+ int status = jObject.getInt("status");
+
+ mrConsumerResponse.setResponseCode(Integer.toString(status));
+
+ if (null != message) {
+ mrConsumerResponse.setResponseMessage(message);
+ }
+ } else if (reply.startsWith("<")) {
+ mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
+ mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ } else {
+ mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+ mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
+ }
+ }
+
+ private JSONObject getResponseDataInJson(String response) {
+ try {
+ JSONTokener jsonTokener = new JSONTokener(response);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ }
+
+ return jsonObject;
+ } catch (JSONException excp) {
+ log.error("DMAAP - Error reading response data.", excp);
+ return null;
+ }
+ }
+
+ private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
+ JSONTokener jsonTokener = new JSONTokener(response);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if (null != response && response.length() == 0) {
+ return null;
+ }
+
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ } else if ('{' == firstChar) {
+ return null;
+ } else if ('<' == firstChar) {
+ return null;
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ }
+
+ return jsonObject;
+ }
+
+ private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
+ this.longPollingMs = timeoutMs;
+ String latitude = props.getProperty("Latitude");
+ String longitude = props.getProperty("Longitude");
+ String version = props.getProperty("Version");
+ String serviceName = props.getProperty("ServiceName");
+ String env = props.getProperty("Environment");
+ String partner = props.getProperty("Partner");
+ String routeOffer = props.getProperty("routeOffer");
+ String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
+ String protocol = props.getProperty("Protocol");
+ String methodType = props.getProperty("MethodType");
+ String dmeuser = props.getProperty("username");
+ String dmepassword = props.getProperty("password");
+ String contenttype = props.getProperty("contenttype");
+ String handlers = props.getProperty("sessionstickinessrequired");
+
+ /**
+ * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
+ * provided use the routeOffer value for auto failover within a cluster
+ */
+
+ String preferredRouteKey = readRoute("preferredRouteKey");
+
+ if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
+ + "&routeoffer=" + preferredRouteKey;
+ } else if (partner != null && !partner.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
+ } else if (routeOffer != null && !routeOffer.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+ + routeOffer;
+ }
+
+ if (timeoutMs != -1)
+ url = url + "&timeout=" + timeoutMs;
+ if (limit != -1)
+ url = url + "&limit=" + limit;
+
+ // Add filter to DME2 Url
+ if (fFilter != null && fFilter.length() > 0)
+ url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
+
+ DMETimeOuts = new HashMap<>();
+ DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ DMETimeOuts.put("Content-Type", contenttype);
+ System.setProperty("AFT_LATITUDE", latitude);
+ System.setProperty("AFT_LONGITUDE", longitude);
+ System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+
+ // SSL changes
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
+ System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+ System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+ // SSL changes
+
+ long dme2PerEndPointTimeoutMs;
+ try {
+ dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
+ // backward compatibility
+ if (dme2PerEndPointTimeoutMs <= 0) {
+ dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
+ }
+ } catch (NumberFormatException nfe) {
+ // backward compatibility
+ dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
+ getLog().debug(
+ "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
+ }
+
+ try {
+ dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
+ } catch (NumberFormatException nfe) {
+ try {
+ long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
+ getLog().debug(
+ "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
+ + dme2ReplyHandlerTimeoutMs);
+ } catch (NumberFormatException e) {
+ // backward compatibility
+ dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
+ getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
+ }
+ }
+ // backward compatibility
+ if (dme2ReplyHandlerTimeoutMs <= 0) {
+ dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
+ }
+
+ sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
+ sender.setAllowAllHttpReturnCodes(true);
+ sender.setMethod(methodType);
+ sender.setSubContext(subContextPath);
+ if (dmeuser != null && dmepassword != null) {
+ sender.setCredentials(dmeuser, dmepassword);
+ }
+ sender.setHeaders(DMETimeOuts);
+ sender.setPayload("");
+ if (handlers != null && handlers.equalsIgnoreCase("yes")) {
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+ props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+ sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+ } else {
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+ }
+ }
+
+ protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
+ final StringBuilder contexturl = new StringBuilder(url);
+ final StringBuilder adds = new StringBuilder();
+
+ if (timeoutMs > -1) {
+ adds.append("timeout=").append(timeoutMs);
+ }
+
+ if (limit > -1) {
+ if (adds.length() > 0) {
+ adds.append("&");
+ }
+ adds.append("limit=").append(limit);
+ }
+
+ if (fFilter != null && fFilter.length() > 0) {
+ try {
+ if (adds.length() > 0) {
+ adds.append("&");
+ }
+ adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ log.error("exception at createUrlPath () : ", e);
+ }
+ }
+
+ if (adds.length() > 0) {
+ contexturl.append("?").append(adds.toString());
+ }
+
+ return contexturl.toString();
+ }
+
+ private String readRoute(String routeKey) {
+ try {
+ MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
+ } catch (Exception ex) {
+ log.error("Reply Router Error " + ex);
+ }
+ return MRClientFactory.prop.getProperty(routeKey);
+ }
+
+ public static List<String> stringToList(String str) {
+ final LinkedList<String> set = new LinkedList<>();
+ if (str != null) {
+ final String[] parts = str.trim().split(",");
+ for (String part : parts) {
+ final String trimmed = part.trim();
+ if (trimmed.length() > 0) {
+ set.add(trimmed);
+ }
+ }
+ }
+ return set;
+ }
+
+ public static String getRouterFilePath() {
+ return routerFilePath;
+ }
+
+ public static void setRouterFilePath(String routerFilePath) {
+ MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+ }
+
+ public String getConsumerFilePath() {
+ return consumerFilePath;
+ }
+
+ public void setConsumerFilePath(String consumerFilePath) {
+ this.consumerFilePath = consumerFilePath;
+ }
+
+ public String getProtocolFlag() {
+ return protocolFlag;
+ }
+
+ public void setProtocolFlag(String protocolFlag) {
+ this.protocolFlag = protocolFlag;
+ }
+
+ public Properties getProps() {
+ return props;
+ }
+
+ public void setProps(Properties props) {
+ this.props = props;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getAuthKey() {
+ return authKey;
+ }
+
+ public void setAuthKey(String authKey) {
+ this.authKey = authKey;
+ }
+
+ public String getAuthDate() {
+ return authDate;
+ }
+
+ public void setAuthDate(String authDate) {
+ this.authDate = authDate;
+ }
+
+ public String getfFilter() {
+ return fFilter;
+ }
+
+ public void setfFilter(String fFilter) {
+ this.fFilter = fFilter;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java
new file mode 100644
index 0000000..538f1e3
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+enum MRFormat
+{
+ /**
+ * Messages are sent using MR's message format.
+ */
+ CAMBRIA
+ {
+ @Override
+ public String toString() { return "application/cambria"; }
+ },
+
+ /**
+ * Messages are sent using MR's message format with compression.
+ */
+ CAMBRIA_ZIP
+ {
+ @Override
+ public String toString() { return "application/cambria-zip"; }
+ },
+
+ /**
+ * messages are sent as simple JSON objects.
+ */
+ JSON
+ {
+ @Override
+ public String toString() { return "application/json"; }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java
new file mode 100644
index 0000000..9051b99
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java
@@ -0,0 +1,266 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.nsa.apiClient.credentials.ApiCredential;
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
+import org.onap.dmaap.mr.client.MRIdentityManager;
+import org.onap.dmaap.mr.client.MRTopicManager;
+
+public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager
+{
+ private static final Logger logger = LoggerFactory.getLogger(MRMetaClient.class);
+ public MRMetaClient ( Collection<String> baseUrls ) throws MalformedURLException
+ {
+ super ( baseUrls );
+ }
+
+ @Override
+ public Set<String> getTopics () throws IOException
+ {
+ final TreeSet<String> set = new TreeSet<String> ();
+ try
+ {
+ final JSONObject topicSet = get ( "/topics" );
+ final JSONArray a = topicSet.getJSONArray ( "topics" );
+ for ( int i=0; i<a.length (); i++ )
+ {
+ set.add ( a.getString ( i ) );
+ }
+ }
+ catch ( HttpObjectNotFoundException e )
+ {
+ getLog().warn ( "No /topics endpoint on service." );
+ logger.error("HttpObjectNotFoundException: ", e);
+ }
+ catch ( JSONException e )
+ {
+ getLog().warn ( "Bad /topics result from service." );
+ logger.error("JSONException: ", e);
+ }
+ catch ( HttpException e )
+ {
+ throw new IOException ( e );
+ }
+ return set;
+ }
+
+ @Override
+ public TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ try
+ {
+ final JSONObject topicData = get ( "/topics/" + MRConstants.escape ( topic ) );
+ return new TopicInfo ()
+ {
+ @Override
+ public String getOwner ()
+ {
+ return topicData.optString ( "owner", null );
+ }
+
+ @Override
+ public String getDescription ()
+ {
+ return topicData.optString ( "description", null );
+ }
+
+ @Override
+ public Set<String> getAllowedProducers ()
+ {
+ final JSONObject acl = topicData.optJSONObject ( "writerAcl" );
+ if ( acl != null && acl.optBoolean ( "enabled", true ) )
+ {
+ return jsonArrayToSet ( acl.optJSONArray ( "users" ) );
+ }
+ return null;
+ }
+
+ @Override
+ public Set<String> getAllowedConsumers ()
+ {
+ final JSONObject acl = topicData.optJSONObject ( "readerAcl" );
+ if ( acl != null && acl.optBoolean ( "enabled", true ) )
+ {
+ return jsonArrayToSet ( acl.optJSONArray ( "users" ) );
+ }
+ return null;
+ }
+ };
+ }
+ catch ( JSONException e )
+ {
+ throw new IOException ( e );
+ }
+ catch ( HttpException e )
+ {
+ throw new IOException ( e );
+ }
+ }
+
+ @Override
+ public void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException
+ {
+ final JSONObject o = new JSONObject ();
+ o.put ( "topicName", topicName );
+ o.put ( "topicDescription", topicDescription );
+ o.put ( "partitionCount", partitionCount );
+ o.put ( "replicationCount", replicationCount );
+ post ( "/topics/create", o, false );
+ }
+
+ @Override
+ public void deleteTopic ( String topic ) throws HttpException, IOException
+ {
+ delete ( "/topics/" + MRConstants.escape ( topic ) );
+ }
+
+ @Override
+ public boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ return null == getAllowedProducers ( topic );
+ }
+
+ @Override
+ public Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ return getTopicMetadata ( topic ).getAllowedProducers ();
+ }
+
+ @Override
+ public void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
+ {
+ put ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() );
+ }
+
+ @Override
+ public void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException
+ {
+ delete ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) );
+ }
+
+ @Override
+ public boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ return null == getAllowedConsumers ( topic );
+ }
+
+ @Override
+ public Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ return getTopicMetadata ( topic ).getAllowedConsumers ();
+ }
+
+ @Override
+ public void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
+ {
+ put ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() );
+ }
+
+ @Override
+ public void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException
+ {
+ delete ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) );
+ }
+
+ @Override
+ public ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException
+ {
+ try
+ {
+ final JSONObject o = new JSONObject ();
+ o.put ( "email", email );
+ o.put ( "description", description );
+ final JSONObject reply = post ( "/apiKeys/create", o, true );
+ return new ApiCredential ( reply.getString ( "key" ), reply.getString ( "secret" ) );
+ }
+ catch ( JSONException e )
+ {
+ // the response doesn't meet our expectation
+ throw new MRApiException ( "The API key response is incomplete.", e );
+ }
+ }
+
+ @Override
+ public ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
+ {
+ final JSONObject keyEntry = get ( "/apiKeys/" + MRConstants.escape ( apiKey ) );
+ if ( keyEntry == null )
+ {
+ return null;
+ }
+
+ return new ApiKey ()
+ {
+ @Override
+ public String getEmail ()
+ {
+ final JSONObject aux = keyEntry.optJSONObject ( "aux" );
+ if ( aux != null )
+ {
+ return aux.optString ( "email" );
+ }
+ return null;
+ }
+
+ @Override
+ public String getDescription ()
+ {
+ final JSONObject aux = keyEntry.optJSONObject ( "aux" );
+ if ( aux != null )
+ {
+ return aux.optString ( "description" );
+ }
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException
+ {
+ final JSONObject o = new JSONObject ();
+ if ( email != null ) o.put ( "email", email );
+ if ( description != null ) o.put ( "description", description );
+ patch ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ), o );
+ }
+
+ @Override
+ public void deleteCurrentApiKey () throws HttpException, IOException
+ {
+ delete ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ) );
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
new file mode 100644
index 0000000..c5eb3ba
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
@@ -0,0 +1,927 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpStatus;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+
+import com.att.aft.dme2.api.DME2Client;
+import com.att.aft.dme2.api.DME2Exception;
+import org.onap.dmaap.mr.client.HostSelector;
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.response.MRPublisherResponse;
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+
+public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
+ private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
+
+ public static class Builder {
+ public Builder() {
+ }
+
+ public Builder againstUrls(Collection<String> baseUrls) {
+ fUrls = baseUrls;
+ return this;
+ }
+
+ public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )
+ {
+ fUrls = baseUrls;
+ fServiceName = serviceName;
+ fTransportype = transportype;
+ return this;
+ }
+
+ public Builder onTopic(String topic) {
+ fTopic = topic;
+ return this;
+ }
+
+ public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ return this;
+ }
+
+ public Builder compress(boolean compress) {
+ fCompress = compress;
+ return this;
+ }
+
+ public Builder httpThreadTime(int threadOccuranceTime) {
+ this.threadOccuranceTime = threadOccuranceTime;
+ return this;
+ }
+
+ public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
+ fAllowSelfSignedCerts = allowSelfSignedCerts;
+ return this;
+ }
+
+ public Builder withResponse(boolean withResponse) {
+ fWithResponse = withResponse;
+ return this;
+ }
+
+ public MRSimplerBatchPublisher build() {
+ if (!fWithResponse) {
+ try {
+ return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+ fAllowSelfSignedCerts, threadOccuranceTime);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ } else {
+ try {
+ return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+ fAllowSelfSignedCerts, fMaxBatchSize);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ }
+
+ private Collection<String> fUrls;
+ private Collection<String> fServiceName;
+ private String fTransportype;
+ private String fTopic;
+ private int fMaxBatchSize = 100;
+ private long fMaxBatchAgeMs = 1000;
+ private boolean fCompress = false;
+ private int threadOccuranceTime = 50;
+ private boolean fAllowSelfSignedCerts = false;
+ private boolean fWithResponse = false;
+
+ };
+
+ @Override
+ public int send(String partition, String msg) {
+ return send(new message(partition, msg));
+ }
+
+ @Override
+ public int send(String msg) {
+ return send(new message(null, msg));
+ }
+
+ @Override
+ public int send(message msg) {
+ final LinkedList<message> list = new LinkedList<message>();
+ list.add(msg);
+ return send(list);
+ }
+
+ @Override
+ public synchronized int send(Collection<message> msgs) {
+ if (fClosed) {
+ throw new IllegalStateException("The publisher was closed.");
+ }
+
+ for (message userMsg : msgs) {
+ fPending.add(new TimestampedMessage(userMsg));
+ }
+ return getPendingMessageCount();
+ }
+
+ @Override
+ public synchronized int getPendingMessageCount() {
+ return fPending.size();
+ }
+
+ @Override
+ public void close() {
+ try {
+ final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ if (remains.isEmpty()) {
+ getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+ + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
+ }
+ } catch (InterruptedException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
+ synchronized (this) {
+ fClosed = true;
+
+ // stop the background sender
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ fExec.shutdown();
+ }
+
+ final long now = Clock.now();
+ final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
+ final long timeoutAtMs = now + waitInMs;
+
+ while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
+ send(true);
+ Thread.sleep(250);
+ }
+
+ synchronized (this) {
+ final LinkedList<message> result = new LinkedList<message>();
+ fPending.drainTo(result);
+ return result;
+ }
+ }
+
+ /**
+ * Possibly send a batch to the MR server. This is called by the background
+ * thread and the close() method
+ *
+ * @param force
+ */
+ private synchronized void send(boolean force) {
+ if (force || shouldSendNow()) {
+ if (!sendBatch()) {
+ getLog().warn("Send failed, " + fPending.size() + " message to send.");
+
+ // note the time for back-off
+ fDontSendUntilMs = sfWaitAfterError + Clock.now();
+ }
+ }
+ }
+
+ private synchronized boolean shouldSendNow() {
+ boolean shouldSend = false;
+ if (fPending.isEmpty()) {
+ final long nowMs = Clock.now();
+
+ shouldSend = (fPending.size() >= fMaxBatchSize);
+ if (!shouldSend) {
+ final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
+ shouldSend = sendAtMs <= nowMs;
+ }
+
+ // however, wait after an error
+ shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
+ }
+ return shouldSend;
+ }
+
+ /**
+ * Method to parse published JSON Objects and Arrays
+ *
+ * @return JSONArray
+ */
+ private JSONArray parseJSON() {
+ JSONArray jsonArray = new JSONArray();
+ for (TimestampedMessage m : fPending) {
+ JSONTokener jsonTokener = new JSONTokener(m.fMsg);
+ JSONObject jsonObject = null;
+ JSONArray tempjsonArray = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ tempjsonArray = new JSONArray(jsonTokener);
+ if (null != tempjsonArray) {
+ for (int i = 0; i < tempjsonArray.length(); i++) {
+ jsonArray.put(tempjsonArray.getJSONObject(i));
+ }
+ }
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ jsonArray.put(jsonObject);
+ }
+
+ }
+ return jsonArray;
+ }
+
+ private synchronized boolean sendBatch() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.size() < 1) {
+ return true;
+ }
+
+ final long nowMs = Clock.now();
+
+ if (this.fHostSelector != null) {
+ host = this.fHostSelector.selectBaseHost();
+ }
+
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+ props.getProperty("partition"));
+
+ try {
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+ OutputStream os = baseStream;
+ final String contentType = props.getProperty("contenttype");
+ if (contentType.equalsIgnoreCase("application/json")) {
+ JSONArray jsonArray = parseJSON();
+ os.write(jsonArray.toString().getBytes());
+ os.close();
+
+ } else if (contentType.equalsIgnoreCase("text/plain")) {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+ } else if (contentType.equalsIgnoreCase("application/cambria")
+ || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+ if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+ } else {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+
+ }
+ os.close();
+ }
+
+ final long startMs = Clock.now();
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+
+ DME2Configue();
+
+ Thread.sleep(5);
+ getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ sender.setPayload(os.toString());
+ String dmeResponse = sender.sendAndWait(5000L);
+
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
+ username, password, protocolFlag);
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ return false;
+ }
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
+ protocolFlag);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ return false;
+ }
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ return false;
+ }
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+ } catch (IllegalArgumentException x) {
+ getLog().warn(x.getMessage(), x);
+ } catch (IOException x) {
+ getLog().warn(x.getMessage(), x);
+ } catch (HttpException x) {
+ getLog().warn(x.getMessage(), x);
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+ }
+ return false;
+ }
+
+ public synchronized MRPublisherResponse sendBatchWithResponse() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.isEmpty()) {
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage("No Messages to send");
+ return pubResponse;
+ }
+
+ final long nowMs = Clock.now();
+
+ host = this.fHostSelector.selectBaseHost();
+
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+ props.getProperty("partition"));
+ OutputStream os = null;
+ try {
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+ os = baseStream;
+ final String contentType = props.getProperty("contenttype");
+ if (contentType.equalsIgnoreCase("application/json")) {
+ JSONArray jsonArray = parseJSON();
+ os.write(jsonArray.toString().getBytes());
+ } else if (contentType.equalsIgnoreCase("text/plain")) {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ } else if (contentType.equalsIgnoreCase("application/cambria")
+ || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+ if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+ } else {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+
+ }
+ }
+
+ final long startMs = Clock.now();
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+
+ try {
+ DME2Configue();
+
+ Thread.sleep(5);
+ getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ sender.setPayload(os.toString());
+
+ String dmeResponse = sender.sendAndWait(5000L);
+
+ pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+ final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
+ getLog().info(logLine);
+ fPending.clear();
+
+ } catch (DME2Exception x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(x.getErrorCode());
+ pubResponse.setResponseMessage(x.getErrorMessage());
+ } catch (URISyntaxException x) {
+
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+ } catch (Exception x) {
+
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+ logger.error("exception: ", x);
+
+ }
+
+ return pubResponse;
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
+ authDate, username, password, protocolFlag);
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
+ password, protocolFlag);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = String.valueOf((Clock.now() - startMs));
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
+
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = String.valueOf((Clock.now() - startMs));
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
+ } catch (IllegalArgumentException x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch (IOException x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch (HttpException x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ }
+
+ finally {
+ if (fPending.size() > 0) {
+ getLog().warn("Send failed, " + fPending.size() + " message to send.");
+ pubResponse.setPendingMsgs(fPending.size());
+ }
+ if (os != null) {
+ try {
+ os.close();
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage("Error in closing Output Stream");
+ }
+ }
+ }
+
+ return pubResponse;
+ }
+
+ public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+
+ if (reply.isEmpty()) {
+
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ mrPubResponse.setResponseMessage("Please verify the Producer properties");
+ } else if (reply.startsWith("{")) {
+ JSONObject jObject = new JSONObject(reply);
+ if (jObject.has("message") && jObject.has("status")) {
+ String message = jObject.getString("message");
+ if (null != message) {
+ mrPubResponse.setResponseMessage(message);
+ }
+ mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+ } else {
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+ mrPubResponse.setResponseMessage(reply);
+ }
+ } else if (reply.startsWith("<")) {
+ String responseCode = getHTTPErrorResponseCode(reply);
+ if (responseCode.contains("403")) {
+ responseCode = "403";
+ }
+ mrPubResponse.setResponseCode(responseCode);
+ mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ }
+
+ return mrPubResponse;
+ }
+
+ private final String fTopic;
+ private final int fMaxBatchSize;
+ private final long fMaxBatchAgeMs;
+ private final boolean fCompress;
+ private int threadOccuranceTime;
+ private boolean fClosed;
+ private String username;
+ private String password;
+ private String host;
+
+ // host selector
+ private HostSelector fHostSelector = null;
+
+ private final LinkedBlockingQueue<TimestampedMessage> fPending;
+ private long fDontSendUntilMs;
+ private final ScheduledThreadPoolExecutor fExec;
+
+ private String latitude;
+ private String longitude;
+ private String version;
+ private String serviceName;
+ private String env;
+ private String partner;
+ private String routeOffer;
+ private String subContextPath;
+ private String protocol;
+ private String methodType;
+ private String url;
+ private String dmeuser;
+ private String dmepassword;
+ private String contentType;
+ private static final long sfWaitAfterError = 10000;
+ private HashMap<String, String> DMETimeOuts;
+ private DME2Client sender;
+ public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+ private String authKey;
+ private String authDate;
+ private String handlers;
+ private Properties props;
+ public static String routerFilePath;
+ protected static final Map<String, String> headers = new HashMap<String, String>();
+ public static MultivaluedMap<String, Object> headersMap;
+
+ private MRPublisherResponse pubResponse;
+
+ public MRPublisherResponse getPubResponse() {
+ return pubResponse;
+ }
+
+ public void setPubResponse(MRPublisherResponse pubResponse) {
+ this.pubResponse = pubResponse;
+ }
+
+ public static String getRouterFilePath() {
+ return routerFilePath;
+ }
+
+ public static void setRouterFilePath(String routerFilePath) {
+ MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+ }
+
+ public Properties getProps() {
+ return props;
+ }
+
+ public void setProps(Properties props) {
+ this.props = props;
+ }
+
+ public String getProtocolFlag() {
+ return protocolFlag;
+ }
+
+ public void setProtocolFlag(String protocolFlag) {
+ this.protocolFlag = protocolFlag;
+ }
+
+ private void DME2Configue() throws Exception {
+ try {
+
+ latitude = props.getProperty("Latitude");
+ longitude = props.getProperty("Longitude");
+ version = props.getProperty("Version");
+ serviceName = props.getProperty("ServiceName");
+ env = props.getProperty("Environment");
+ partner = props.getProperty("Partner");
+ routeOffer = props.getProperty("routeOffer");
+ subContextPath = props.getProperty("SubContextPath") + fTopic;
+
+ protocol = props.getProperty("Protocol");
+ methodType = props.getProperty("MethodType");
+ dmeuser = props.getProperty("username");
+ dmepassword = props.getProperty("password");
+ contentType = props.getProperty("contenttype");
+ handlers = props.getProperty("sessionstickinessrequired");
+ routerFilePath = props.getProperty("DME2preferredRouterFilePath");
+
+ /**
+ * Changes to DME2Client url to use Partner for auto failover
+ * between data centers When Partner value is not provided use the
+ * routeOffer value for auto failover within a cluster
+ */
+
+ String partitionKey = props.getProperty("partition");
+
+ if (partner != null && !partner.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
+ + partner;
+ if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ } else if (routeOffer != null && !routeOffer.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+ + routeOffer;
+ if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ }
+
+ DMETimeOuts = new HashMap<String, String>();
+ DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ DMETimeOuts.put("Content-Type", contentType);
+ System.setProperty("AFT_LATITUDE", latitude);
+ System.setProperty("AFT_LONGITUDE", longitude);
+ System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+ // System.setProperty("DME2.DEBUG", "true");
+
+ // SSL changes
+ // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
+ System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+ System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+
+ // SSL changes
+
+ sender = new DME2Client(new URI(url), 5000L);
+
+ sender.setAllowAllHttpReturnCodes(true);
+ sender.setMethod(methodType);
+ sender.setSubContext(subContextPath);
+ sender.setCredentials(dmeuser, dmepassword);
+ sender.setHeaders(DMETimeOuts);
+ if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+ props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
+ props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+ sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+ } else {
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+ }
+ } catch (DME2Exception x) {
+ getLog().warn(x.getMessage(), x);
+ throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
+ } catch (URISyntaxException x) {
+
+ getLog().warn(x.getMessage(), x);
+ throw new URISyntaxException(url, x.getMessage());
+ } catch (Exception x) {
+
+ getLog().warn(x.getMessage(), x);
+ throw new IllegalArgumentException(x.getMessage());
+ }
+ }
+
+ private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+ boolean compress) throws MalformedURLException {
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
+ }
+
+ fHostSelector = new HostSelector(hosts, null);
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+
+ fPending = new LinkedBlockingQueue<TimestampedMessage>();
+ fDontSendUntilMs = 0;
+ fExec = new ScheduledThreadPoolExecutor(1);
+ pubResponse = new MRPublisherResponse();
+
+ }
+
+ private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+ boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
+ }
+
+ fHostSelector = new HostSelector(hosts, null);
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+ threadOccuranceTime = httpThreadOccurnace;
+ fPending = new LinkedBlockingQueue<TimestampedMessage>();
+ fDontSendUntilMs = 0;
+ fExec = new ScheduledThreadPoolExecutor(1);
+ fExec.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ send(false);
+ }
+ }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
+ pubResponse = new MRPublisherResponse();
+ }
+
+ private static class TimestampedMessage extends message {
+ public TimestampedMessage(message m) {
+ super(m);
+ timestamp = Clock.now();
+ }
+
+ public final long timestamp;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public String getAuthKey() {
+ return authKey;
+ }
+
+ public void setAuthKey(String authKey) {
+ this.authKey = authKey;
+ }
+
+ public String getAuthDate() {
+ return authDate;
+ }
+
+ public void setAuthDate(String authDate) {
+ this.authDate = authDate;
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/response/MRConsumerResponse.java b/src/main/java/org/onap/dmaap/mr/client/response/MRConsumerResponse.java
new file mode 100644
index 0000000..5dc3da0
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/response/MRConsumerResponse.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.response;
+
+public class MRConsumerResponse {
+
+ private String responseCode;
+
+ private String responseMessage;
+
+ private Iterable<String> actualMessages;
+
+
+
+
+ public String getResponseCode() {
+ return responseCode;
+ }
+
+ public void setResponseCode(String responseCode) {
+ this.responseCode = responseCode;
+ }
+
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ public void setResponseMessage(String responseMessage) {
+ this.responseMessage = responseMessage;
+ }
+
+ public Iterable<String> getActualMessages() {
+ return actualMessages;
+ }
+
+ public void setActualMessages(Iterable<String> actualMessages) {
+ this.actualMessages = actualMessages;
+ }
+
+
+}
diff --git a/src/main/java/org/onap/dmaap/mr/client/response/MRPublisherResponse.java b/src/main/java/org/onap/dmaap/mr/client/response/MRPublisherResponse.java
new file mode 100644
index 0000000..f0630be
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/client/response/MRPublisherResponse.java
@@ -0,0 +1,67 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.client.response;
+
+/**
+ * Response for Publisher
+ * @author author
+ *
+ */
+public class MRPublisherResponse {
+ private String responseCode;
+
+ private String responseMessage;
+
+ private int pendingMsgs;
+
+ public String getResponseCode() {
+ return responseCode;
+ }
+
+ public void setResponseCode(String responseCode) {
+ this.responseCode = responseCode;
+ }
+
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ public void setResponseMessage(String responseMessage) {
+ this.responseMessage = responseMessage;
+ }
+
+ public int getPendingMsgs() {
+ return pendingMsgs;
+ }
+
+ public void setPendingMsgs(int pendingMsgs) {
+ this.pendingMsgs = pendingMsgs;
+ }
+
+ @Override
+ public String toString() {
+ return "Response Code:" + this.responseCode + ","
+ + "Response Message:" + this.responseMessage + "," + "Pending Messages Count"
+ + this.pendingMsgs;
+ }
+
+}