aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/nsa')
-rw-r--r--src/main/java/com/att/nsa/mr/client/HostSelector.java198
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRBatchingPublisher.java55
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRClient.java66
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRClientBuilders.java449
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRClientFactory.java691
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRConsumer.java54
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRIdentityManager.java100
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRPublisher.java93
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRTopicManager.java183
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/Clock.java63
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/DmaapClientUtil.java85
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java395
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRBatchPublisher.java487
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRClientVersionInfo.java59
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRConstants.java179
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java675
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRFormat.java52
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRMetaClient.java266
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java927
-rw-r--r--src/main/java/com/att/nsa/mr/client/response/MRConsumerResponse.java60
-rw-r--r--src/main/java/com/att/nsa/mr/client/response/MRPublisherResponse.java67
-rw-r--r--src/main/java/com/att/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java52
-rw-r--r--src/main/java/com/att/nsa/mr/dme/client/HeaderReplyHandler.java63
-rw-r--r--src/main/java/com/att/nsa/mr/dme/client/PreferredRouteReplyHandler.java72
-rw-r--r--src/main/java/com/att/nsa/mr/dme/client/PreferredRouteRequestHandler.java54
-rw-r--r--src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java88
-rw-r--r--src/main/java/com/att/nsa/mr/dme/client/SimpleExamplePublisher.java135
-rw-r--r--src/main/java/com/att/nsa/mr/logging/MRAppender.java165
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java95
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java44
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java87
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java86
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java84
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java95
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java98
-rw-r--r--src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java84
-rw-r--r--src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java184
-rw-r--r--src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java169
-rw-r--r--src/main/java/com/att/nsa/mr/tools/ApiKeyCommand.java141
-rw-r--r--src/main/java/com/att/nsa/mr/tools/AuthCommand.java69
-rw-r--r--src/main/java/com/att/nsa/mr/tools/ClusterCommand.java80
-rw-r--r--src/main/java/com/att/nsa/mr/tools/MRCommandContext.java100
-rw-r--r--src/main/java/com/att/nsa/mr/tools/MRTool.java49
-rw-r--r--src/main/java/com/att/nsa/mr/tools/MessageCommand.java131
-rw-r--r--src/main/java/com/att/nsa/mr/tools/ToolsUtil.java40
-rw-r--r--src/main/java/com/att/nsa/mr/tools/TopicCommand.java221
-rw-r--r--src/main/java/com/att/nsa/mr/tools/TraceCommand.java118
-rw-r--r--src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java174
48 files changed, 0 insertions, 7982 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/HostSelector.java b/src/main/java/com/att/nsa/mr/client/HostSelector.java
deleted file mode 100644
index 63ef404..0000000
--- a/src/main/java/com/att/nsa/mr/client/HostSelector.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.Vector;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HostSelector
-{
- private final TreeSet<String> fBaseHosts;
- private final DelayQueue<BlacklistEntry> fBlacklist;
- private String fIdealHost;
- private String fCurrentHost;
- private static final Logger log = LoggerFactory.getLogger(HostSelector.class);
-
- public HostSelector(String hostPart)
- {
- this(makeSet(hostPart), null);
- }
-
- public HostSelector(Collection<String> baseHosts)
- {
- this(baseHosts, null);
- }
-
- public HostSelector(Collection<String> baseHosts, String signature)
- {
- if (baseHosts.isEmpty())
- {
- throw new IllegalArgumentException("At least one host must be provided.");
- }
-
- this.fBaseHosts = new TreeSet(baseHosts);
- this.fBlacklist = new DelayQueue();
- this.fIdealHost = null;
-
- if (signature == null) {
- return;
- }
- int index = 0 ;
- int value = signature.hashCode();
- if(value!=0) {
- index = Math.abs(value) % baseHosts.size();
- }
- Iterator it = this.fBaseHosts.iterator();
- while (index-- > 0)
- {
- it.next();
- }
- this.fIdealHost = ((String)it.next());
- }
-
- public String selectBaseHost()
- {
- if (this.fCurrentHost == null)
- {
- makeSelection();
- }
- return this.fCurrentHost;
- }
-
- public void reportReachabilityProblem(long blacklistUnit, TimeUnit blacklistTimeUnit)
- {
- if (this.fCurrentHost == null)
- {
- log.warn("Reporting reachability problem, but no host is currently selected.");
- }
-
- if (blacklistUnit > 0L)
- {
- for (BlacklistEntry be : this.fBlacklist)
- {
- if (be.getHost().equals(this.fCurrentHost))
- {
- be.expireNow();
- }
- }
-
- LinkedList devNull = new LinkedList();
- this.fBlacklist.drainTo(devNull);
-
- if (this.fCurrentHost != null)
- {
- this.fBlacklist.add(new BlacklistEntry(this.fCurrentHost, TimeUnit.MILLISECONDS.convert(blacklistUnit, blacklistTimeUnit)));
- }
- }
- this.fCurrentHost = null;
- }
-
- private String makeSelection()
- {
- TreeSet workingSet = new TreeSet(this.fBaseHosts);
-
- LinkedList devNull = new LinkedList();
- this.fBlacklist.drainTo(devNull);
- for (BlacklistEntry be : this.fBlacklist)
- {
- workingSet.remove(be.getHost());
- }
-
- if (workingSet.isEmpty())
- {
- log.warn("All hosts were blacklisted; reverting to full set of hosts.");
- workingSet.addAll(this.fBaseHosts);
- this.fCurrentHost = null;
- }
-
- String selection = null;
- if ((this.fCurrentHost != null) && (workingSet.contains(this.fCurrentHost)))
- {
- selection = this.fCurrentHost;
- }
- else if ((this.fIdealHost != null) && (workingSet.contains(this.fIdealHost)))
- {
- selection = this.fIdealHost;
- }
- else
- {
- int index = 0;
- int value = new Random().nextInt();
- Vector v = new Vector(workingSet);
- if(value!=0) {
- index = Math.abs(value) % workingSet.size();
- }
- selection = (String)v.elementAt(index);
- }
-
- this.fCurrentHost = selection;
- return this.fCurrentHost;
- }
-
- private static Set<String> makeSet(String s)
- {
- TreeSet set = new TreeSet();
- set.add(s);
- return set; }
-
- private static class BlacklistEntry implements Delayed {
- private final String fHost;
- private long fExpireAtMs;
-
- public BlacklistEntry(String host, long delayMs) {
- this.fHost = host;
- this.fExpireAtMs = (System.currentTimeMillis() + delayMs);
- }
-
- public void expireNow()
- {
- this.fExpireAtMs = 0L;
- }
-
- public String getHost()
- {
- return this.fHost;
- }
-
- public int compareTo(Delayed o)
- {
- Long thisDelay = Long.valueOf(getDelay(TimeUnit.MILLISECONDS));
- return thisDelay.compareTo(Long.valueOf(o.getDelay(TimeUnit.MILLISECONDS)));
- }
-
- public long getDelay(TimeUnit unit)
- {
- long remainingMs = this.fExpireAtMs - System.currentTimeMillis();
- return unit.convert(remainingMs, TimeUnit.MILLISECONDS);
- }
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/client/MRBatchingPublisher.java b/src/main/java/com/att/nsa/mr/client/MRBatchingPublisher.java
deleted file mode 100644
index 875b5a3..0000000
--- a/src/main/java/com/att/nsa/mr/client/MRBatchingPublisher.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import com.att.nsa.mr.client.response.MRPublisherResponse;
-
-/**
- * A MR batching publisher is a publisher with additional functionality
- * for managing delayed sends.
- *
- * @author author
- *
- */
-public interface MRBatchingPublisher extends MRPublisher
-{
- /**
- * Get the number of messages that have not yet been sent.
- * @return the number of pending messages
- */
- int getPendingMessageCount ();
-
- /**
- * Close this publisher, sending any remaining messages.
- * @param timeout an amount of time to wait for unsent messages to be sent
- * @param timeoutUnits the time unit for the timeout arg
- * @return a list of any unsent messages after the timeout
- * @throws IOException exception
- * @throws InterruptedException exception
- */
- List<message> close ( long timeout, TimeUnit timeoutUnits ) throws IOException, InterruptedException;
-
- MRPublisherResponse sendBatchWithResponse ();
-}
diff --git a/src/main/java/com/att/nsa/mr/client/MRClient.java b/src/main/java/com/att/nsa/mr/client/MRClient.java
deleted file mode 100644
index f3a8f43..0000000
--- a/src/main/java/com/att/nsa/mr/client/MRClient.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client;
-
-import org.slf4j.Logger;
-
-
-public interface MRClient
-{
- /**
- * An exception at the MR layer. This is used when the HTTP transport
- * layer returns a success code but the transaction is not completed as expected.
- */
- public class MRApiException extends Exception
- {
- private static final long serialVersionUID = 1L;
- public MRApiException ( String msg ) { super ( msg ); }
- public MRApiException ( String msg, Throwable t ) { super ( msg, t ); }
- }
-
- /**
- * Optionally set the Logger to use
- * @param log log
- */
- void logTo ( Logger log );
-
- /**
- * Set the API credentials for this client connection. Subsequent calls will
- * include authentication headers.who i
- */
- /**
- * @param apiKey apikey
- * @param apiSecret apisec
- */
- void setApiCredentials ( String apiKey, String apiSecret );
-
- /**
- * Remove API credentials, if any, on this connection. Subsequent calls will not include
- * authentication headers.
- */
- void clearApiCredentials ();
-
- /**
- * Close this connection. Some client interfaces have additional close capability.
- */
- void close ();
-}
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java b/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java
deleted file mode 100644
index 73ef5c4..0000000
--- a/src/main/java/com/att/nsa/mr/client/MRClientBuilders.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Modifications Copyright © 2018 IBM.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client;
-
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.mr.client.impl.MRConsumerImpl;
-import com.att.nsa.mr.client.impl.MRMetaClient;
-import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
-
-/**
- * A collection of builders for various types of MR API clients
- *
- * @author author
- */
-public class MRClientBuilders
-{
-
- /**
- * Instantiates MRClientBuilders.
- */
- private MRClientBuilders() {
- // prevent instantiation
- }
-
- /**
- * A builder for a topic Consumer
- * @author author
- */
- public static class ConsumerBuilder
- {
- /**
- * Construct a consumer builder.
- */
- public ConsumerBuilder () {}
-
- /**
- * Set the host list
- * @param hostList a comma-separated list of hosts to use to connect to MR
- * @return this builder
- */
- public ConsumerBuilder usingHosts ( String hostList ) {
- return usingHosts ( MRConsumerImpl.stringToList(hostList) );
- }
-
- /**
- * Set the host list
- * @param hostSet a set of hosts to use to connect to MR
- * @return this builder
- */
- public ConsumerBuilder usingHosts ( Collection<String> hostSet ) {
- fHosts = hostSet; return this;
- }
-
- /**
- * Set the topic
- * @param topic the name of the topic to consume
- * @return this builder
- */
- public ConsumerBuilder onTopic ( String topic ) {
- fTopic=topic;
- return this;
- }
-
- /**
- * Set the consumer's group and ID
- * @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consumer in its group
- * @return this builder
- */
- public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) {
- fGroup = consumerGroup;
- fId = consumerId;
- return this;
- }
-
- /**
- * Set the API key and secret for this client.
- * @param apiKey
- * @param apiSecret
- * @return this builder
- */
- public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) {
- fApiKey = apiKey;
- fApiSecret = apiSecret;
- return this;
- }
-
- /**
- * Set the server side timeout
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
- * @return this builder
- */
- public ConsumerBuilder waitAtServer ( int timeoutMs ) {
- fTimeoutMs = timeoutMs;
- return this;
- };
-
- /**
- * Set the maximum number of messages to receive per transaction
- * @param limit The maximum number of messages to receive from the server in one transaction.
- * @return this builder
- */
- public ConsumerBuilder receivingAtMost ( int limit ) {
- fLimit = limit;
- return this;
- };
-
- /**
- * Set a filter to use on the server
- * @param filter a Highland Park standard library filter encoded in JSON
- * @return this builder
- */
- public ConsumerBuilder withServerSideFilter ( String filter ) {
- fFilter = filter;
- return this;
- }
-
- /**
- * Build the consumer
- * @return a consumer
- */
- public MRConsumer build ()
- {
- if ( fHosts == null || fHosts.size() == 0 || fTopic == null )
- {
- throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
- }
-
- if ( fGroup == null )
- {
- fGroup = UUID.randomUUID ().toString ();
- fId = "0";
- log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
- }
-
- if ( sfConsumerMock != null ) return sfConsumerMock;
- try {
- return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- private Collection<String> fHosts = null;
- private String fTopic = null;
- private String fGroup = null;
- private String fId = null;
- private String fApiKey = null;
- private String fApiSecret = null;
- private int fTimeoutMs = -1;
- private int fLimit = -1;
- private String fFilter = null;
- }
-
- /*************************************************************************/
- /*************************************************************************/
- /*************************************************************************/
-
- /**
- * A publisher builder
- * @author author
- */
- public static class PublisherBuilder
- {
- public PublisherBuilder () {}
-
- /**
- * Set the MR/UEB host(s) to use
- * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
- * @return this builder
- */
- public PublisherBuilder usingHosts ( String hostlist ) {
- return usingHosts ( MRConsumerImpl.stringToList(hostlist) );
- }
-
- /**
- * Set the MR/UEB host(s) to use
- * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
- * @return this builder
- */
- public PublisherBuilder usingHosts ( String[] hostSet )
- {
- final TreeSet<String> hosts = new TreeSet<String> ();
- for ( String hp : hostSet )
- {
- hosts.add ( hp );
- }
- return usingHosts ( hosts );
- }
-
- /**
- * Set the MR/UEB host(s) to use
- * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
- * @return this builder
- */
- public PublisherBuilder usingHosts ( Collection<String> hostlist ) {
- fHosts=hostlist;
- return this;
- }
-
- /**
- * Set the topic to publish on
- * @param topic The topic on which to publish messages.
- * @return this builder
- */
- public PublisherBuilder onTopic ( String topic ) {
- fTopic = topic;
- return this;
- }
-
- /**
- * Batch message sends with the given limits.
- * @param messageCount The largest set of messages to batch.
- * @param ageInMs The maximum age of a message waiting in a batch.
- * @return this builder
- */
- public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) {
- fMaxBatchSize = messageCount;
- fMaxBatchAgeMs = ageInMs;
- return this;
- }
-
- /**
- * Compress transactions
- * @return this builder
- */
- public PublisherBuilder withCompresion () {
- return enableCompresion(true);
- }
-
- /**
- * Do not compress transactions
- * @return this builder
- */
- public PublisherBuilder withoutCompresion () {
- return enableCompresion(false);
- }
-
- /**
- * Set the compression option
- * @param compress true to gzip compress transactions
- * @return this builder
- */
- public PublisherBuilder enableCompresion ( boolean compress ) {
- fCompress = compress;
- return this;
- }
-
- /**
- * Set the API key and secret for this client.
- * @param apiKey
- * @param apiSecret
- * @return this builder
- */
- public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) {
- fApiKey = apiKey;
- fApiSecret = apiSecret;
- return this;
- }
-
- /**
- * Build the publisher
- * @return a batching publisher
- */
- public MRBatchingPublisher build ()
- {
- if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
- {
- throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
- }
-
- if ( sfPublisherMock != null ) return sfPublisherMock;
-
- final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
- againstUrls ( fHosts ).
- onTopic ( fTopic ).
- batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
- compress ( fCompress ).
- build ();
- if ( fApiKey != null )
- {
- pub.setApiCredentials ( fApiKey, fApiSecret );
- }
- return pub;
- }
-
- private Collection<String> fHosts = null;
- private String fTopic = null;
- private int fMaxBatchSize = 1;
- private int fMaxBatchAgeMs = 1;
- private boolean fCompress = false;
- private String fApiKey = null;
- private String fApiSecret = null;
- }
-
- /**
- * A builder for an identity manager
- * @author author
- */
- public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
- {
- /**
- * Construct an identity manager builder.
- */
- public IdentityManagerBuilder () {}
-
- @Override
- protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
- return new MRMetaClient ( hosts );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- } }
- }
-
- /**
- * A builder for a topic manager
- * @author author
- */
- public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
- {
- /**
- * Construct an topic manager builder.
- */
- public TopicManagerBuilder () {}
-
- @Override
- protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
- return new MRMetaClient ( hosts );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- } }
- }
-
- /**
- * Inject a consumer. Used to support unit tests.
- * @param cc
- */
- public static void $testInject ( MRConsumer cc )
- {
- sfConsumerMock = cc;
- }
-
- /**
- * Inject a publisher. Used to support unit tests.
- * @param pub
- */
- public static void $testInject ( MRBatchingPublisher pub )
- {
- sfPublisherMock = pub;
- }
-
- static MRConsumer sfConsumerMock = null;
- static MRBatchingPublisher sfPublisherMock = null;
-
- /**
- * A builder for an identity manager
- * @author author
- */
- public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
- {
- /**
- * Construct an identity manager builder.
- */
- public AbstractAuthenticatedManagerBuilder () {}
-
- /**
- * Set the host list
- * @param hostList a comma-separated list of hosts to use to connect to MR
- * @return this builder
- */
- public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) {
- return usingHosts ( MRConsumerImpl.stringToList(hostList) );
- }
-
- /**
- * Set the host list
- * @param hostSet a set of hosts to use to connect to MR
- * @return this builder
- */
- public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) {
- fHosts = hostSet;
- return this;
- }
-
- /**
- * Set the API key and secret for this client.
- * @param apiKey
- * @param apiSecret
- * @return this builder
- */
- public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) {
- fApiKey = apiKey;
- fApiSecret = apiSecret;
- return this;
- }
-
- /**
- * Build the consumer
- * @return a consumer
- */
- public T build ()
- {
- if ( fHosts.isEmpty() )
- {
- throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
- }
-
- final T mgr = constructClient ( fHosts );
- mgr.setApiCredentials ( fApiKey, fApiSecret );
- return mgr;
- }
-
- protected abstract T constructClient ( Collection<String> hosts );
-
- private Collection<String> fHosts = null;
- private String fApiKey = null;
- private String fApiSecret = null;
- }
-
- private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );
-}
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
deleted file mode 100644
index 689190e..0000000
--- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
+++ /dev/null
@@ -1,691 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Modifications Copyright © 2018 IBM.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import javax.ws.rs.core.MultivaluedMap;
-
-import com.att.nsa.mr.client.impl.MRConsumerImpl;
-import com.att.nsa.mr.client.impl.MRMetaClient;
-import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-import com.att.nsa.mr.tools.ValidatorUtil;
-
-/**
- * A factory for MR clients.<br/>
- * <br/>
- * Use caution selecting a consumer creator factory. If the call doesn't accept
- * a consumer group name, then it creates a consumer that is not restartable.
- * That is, if you stop your process and start it again, your client will NOT
- * receive any missed messages on the topic. If you need to ensure receipt of
- * missed messages, then you must use a consumer that's created with a group
- * name and ID. (If you create multiple consumer processes using the same group,
- * load is split across them. Be sure to use a different ID for each
- * instance.)<br/>
- * <br/>
- * Publishers
- *
- * @author author
- */
-public class MRClientFactory {
- private static final String AUTH_KEY = "authKey";
- private static final String AUTH_DATE = "authDate";
- private static final String PASSWORD = "password";
- private static final String USERNAME = "username";
- private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
- private static final String TOPIC = "topic";
- private static final String TRANSPORT_TYPE = "TransportType";
- public static MultivaluedMap<String, Object> HTTPHeadersMap;
- public static Map<String, String> DME2HeadersMap;
- public static String routeFilePath;
-
- public static FileReader routeReader;
-
- public static FileWriter routeWriter = null;
- public static Properties prop = null;
-
- /**
- * Instantiates MRClientFactory.
- */
- private MRClientFactory() {
- //prevents instantiation.
- }
-
- /**
- * Create a consumer instance with the default timeout and no limit on
- * messages returned. This consumer operates as an independent consumer
- * (i.e., not in a group) and is NOT re-startable across sessions.
- *
- * @param hostList
- * A comma separated list of hosts to use to connect to MR. You
- * can include port numbers (3904 is the default). For example,
- * "hostname:8080,"
- *
- * @param topic
- * The topic to consume
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(String hostList, String topic) {
- return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
- }
-
- /**
- * Create a consumer instance with the default timeout and no limit on
- * messages returned. This consumer operates as an independent consumer
- * (i.e., not in a group) and is NOT re-startable across sessions.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
- return createConsumer(hostSet, topic, null);
- }
-
- /**
- * Create a consumer instance with server-side filtering, the default
- * timeout, and no limit on messages returned. This consumer operates as an
- * independent consumer (i.e., not in a group) and is NOT re-startable
- * across sessions.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param filter
- * a filter to use on the server side
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
- return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId) {
- return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit) {
- return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart. This consumer also uses server-side filtering.
- *
- * @param hostList
- * A comma separated list of hosts to use to connect to MR. You
- * can include port numbers (3904 is the default). For example,
- * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- * @param filter
- * A Highland Park filter expression using only built-in filter
- * components. Use null for "no filter".
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
- return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
- filter, apiKey, apiSecret);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart. This consumer also uses server-side filtering.
- *
- * @param hostSet
- * The host used in the URL to MR. Entries can be "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- * @param filter
- * A Highland Park filter expression using only built-in filter
- * components. Use null for "no filter".
- *
- * @return a consumer
- */
- public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
- if (MRClientBuilders.sfConsumerMock != null)
- return MRClientBuilders.sfConsumerMock;
- try {
- return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
- apiSecret);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- /*************************************************************************/
- /*************************************************************************/
- /*************************************************************************/
-
- /**
- * Create a publisher that sends each message (or group of messages)
- * immediately. Most applications should favor higher latency for much
- * higher message throughput and the "simple publisher" is not a good
- * choice.
- *
- * @param hostlist
- * The host used in the URL to MR. Can be "host:port", can be
- * multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @return a publisher
- */
- public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
- return createBatchingPublisher(hostlist, topic, 1, 1);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown. Message payloads are
- * not compressed.
- *
- * @param hostlist
- * The host used in the URL to MR. Can be "host:port", can be
- * multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- *
- * @return a publisher
- */
- public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
- long maxAgeMs) {
- return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param hostlist
- * The host used in the URL to MR. Can be "host:port", can be
- * multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
- * @return a publisher
- */
- public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
- long maxAgeMs, boolean compress) {
- return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
- * @return a publisher
- */
- public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
- long maxAgeMs, boolean compress) {
- final TreeSet<String> hosts = new TreeSet<String>();
- for (String hp : hostSet) {
- hosts.add(hp);
- }
- return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
- * @return a publisher
- */
- public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
- int maxBatchSize, long maxAgeMs, boolean compress) {
- return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
- .compress(compress).build();
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param host
- * A host to be used in the URL to MR. Can be "host:port". Use
- * multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param username
- * username
- * @param password
- * password
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- * @param protocolFlag
- * http auth or ueb auth or dme2 method
- * @param producerFilePath
- * all properties for publisher
- * @return MRBatchingPublisher obj
- */
- public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
- final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
- MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
- .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
- .compress(compress).build();
-
- pub.setHost(host);
- pub.setUsername(username);
- pub.setPassword(password);
- pub.setProtocolFlag(protocolFlag);
- return pub;
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown
- *
- * @param Properties
- * props set all properties for publishing message
- * @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
- */
- public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
- throws FileNotFoundException, IOException {
- return createInternalBatchingPublisher(props, withResponse);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown
- *
- * @param Properties
- * props set all properties for publishing message
- * @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
- */
- public static MRBatchingPublisher createBatchingPublisher(Properties props)
- throws FileNotFoundException, IOException {
- return createInternalBatchingPublisher(props, false);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown
- *
- * @param producerFilePath
- * set all properties for publishing message
- * @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
- */
- public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
- throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(producerFilePath));
- Properties props = new Properties();
- props.load(reader);
- return createBatchingPublisher(props);
- }
-
- /**
- * Create a publisher that will contain send methods that return response
- * object to user.
- *
- * @param producerFilePath
- * set all properties for publishing message
- * @return MRBatchingPublisher obj
- * @throws FileNotFoundException
- * exc
- * @throws IOException
- * ioex
- */
- public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
- throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(producerFilePath));
- Properties props = new Properties();
- props.load(reader);
- return createBatchingPublisher(props, withResponse);
- }
-
- protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
- throws FileNotFoundException, IOException {
- assert props != null;
- MRSimplerBatchPublisher pub;
- if (withResponse) {
- pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
- .onTopic(props.getProperty(TOPIC))
- .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
- Integer.parseInt(props.getProperty("maxAgeMs").toString()))
- .compress(Boolean.parseBoolean(props.getProperty("compress")))
- .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
- .withResponse(withResponse).build();
- } else {
- pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
- .onTopic(props.getProperty(TOPIC))
- .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
- Integer.parseInt(props.getProperty("maxAgeMs").toString()))
- .compress(Boolean.parseBoolean(props.getProperty("compress")))
- .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
- }
- pub.setHost(props.getProperty("host"));
- if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
-
- pub.setAuthKey(props.getProperty(AUTH_KEY));
- pub.setAuthDate(props.getProperty(AUTH_DATE));
- pub.setUsername(props.getProperty(USERNAME));
- pub.setPassword(props.getProperty(PASSWORD));
- } else {
- pub.setUsername(props.getProperty(USERNAME));
- pub.setPassword(props.getProperty(PASSWORD));
- }
- pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
- pub.setProps(props);
- prop = new Properties();
- if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
- routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
- routeReader = new FileReader(new File(routeFilePath));
- File fo = new File(routeFilePath);
- if (!fo.exists()) {
- routeWriter = new FileWriter(new File(routeFilePath));
- }
- }
- return pub;
- }
-
- /**
- * Create an identity manager client to work with API keys.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param apiKey
- * Your API key
- * @param apiSecret
- * Your API secret
- * @return an identity manager
- */
- public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
- MRIdentityManager cim;
- try {
- cim = new MRMetaClient(hostSet);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- cim.setApiCredentials(apiKey, apiSecret);
- return cim;
- }
-
- /**
- * Create a topic manager for working with topics.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to MR. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param apiKey
- * Your API key
- * @param apiSecret
- * Your API secret
- * @return a topic manager
- */
- public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
- MRMetaClient tmi;
- try {
- tmi = new MRMetaClient(hostSet);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- tmi.setApiCredentials(apiKey, apiSecret);
- return tmi;
- }
-
- /**
- * Inject a consumer. Used to support unit tests.
- *
- * @param cc
- */
- public static void $testInject(MRConsumer cc) {
- MRClientBuilders.sfConsumerMock = cc;
- }
-
- public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
- String id, int i, int j, String protocalFlag, String consumerFilePath) {
-
- MRConsumerImpl sub;
- try {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- sub.setUsername(username);
- sub.setPassword(password);
- sub.setHost(host);
- sub.setProtocolFlag(protocalFlag);
- sub.setConsumerFilePath(consumerFilePath);
- return sub;
-
- }
-
- public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
- String id, String protocalFlag, String consumerFilePath, int i, int j) {
-
- MRConsumerImpl sub;
- try {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- sub.setUsername(username);
- sub.setPassword(password);
- sub.setHost(host);
- sub.setProtocolFlag(protocalFlag);
- sub.setConsumerFilePath(consumerFilePath);
- return sub;
-
- }
-
- public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(consumerFilePath));
- Properties props = new Properties();
- props.load(reader);
-
- return createConsumer(props);
- }
-
- public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
- int timeout;
- ValidatorUtil.validateSubscriber(props);
- if (props.getProperty("timeout") != null)
- timeout = Integer.parseInt(props.getProperty("timeout"));
- else
- timeout = -1;
- int limit;
- if (props.getProperty("limit") != null)
- limit = Integer.parseInt(props.getProperty("limit"));
- else
- limit = -1;
- String group;
- if (props.getProperty("group") == null)
- group = UUID.randomUUID().toString();
- else
- group = props.getProperty("group");
- MRConsumerImpl sub = null;
- if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
- group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
- props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE));
- sub.setAuthKey(props.getProperty(AUTH_KEY));
- sub.setAuthDate(props.getProperty(AUTH_DATE));
- sub.setUsername(props.getProperty(USERNAME));
- sub.setPassword(props.getProperty(PASSWORD));
- } else {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
- group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
- props.getProperty(USERNAME), props.getProperty(PASSWORD));
- sub.setUsername(props.getProperty(USERNAME));
- sub.setPassword(props.getProperty(PASSWORD));
- }
-
- sub.setProps(props);
- sub.setHost(props.getProperty("host"));
- sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
- sub.setfFilter(props.getProperty("filter"));
- if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
- MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
- routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
- routeReader = new FileReader(new File(routeFilePath));
- prop = new Properties();
- File fo = new File(routeFilePath);
- if (!fo.exists()) {
- routeWriter = new FileWriter(new File(routeFilePath));
- }
- }
-
- return sub;
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/client/MRConsumer.java b/src/main/java/com/att/nsa/mr/client/MRConsumer.java
deleted file mode 100644
index 444eb7c..0000000
--- a/src/main/java/com/att/nsa/mr/client/MRConsumer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client;
-
-import java.io.IOException;
-
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-
-public interface MRConsumer extends MRClient
-{
- /**
- * Fetch a set of messages. The consumer's timeout and message limit are used if set in the constructor call.
-
- * @return a set of messages
- * @throws IOException
- */
- Iterable<String> fetch () throws IOException, Exception;
-
- /**
- * Fetch a set of messages with an explicit timeout and limit for this call. These values
- * override any set in the constructor call.
- *
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection
- * open while waiting for message traffic. Use -1 for default timeout (controlled on the server-side).
- * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
- * @return a set messages
- * @throws IOException if there's a problem connecting to the server
- */
- Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException, Exception;
-
- MRConsumerResponse fetchWithReturnConsumerResponse ();
-
-
- MRConsumerResponse fetchWithReturnConsumerResponse ( int timeoutMs, int limit );
-}
diff --git a/src/main/java/com/att/nsa/mr/client/MRIdentityManager.java b/src/main/java/com/att/nsa/mr/client/MRIdentityManager.java
deleted file mode 100644
index 5483761..0000000
--- a/src/main/java/com/att/nsa/mr/client/MRIdentityManager.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client;
-
-import java.io.IOException;
-
-import com.att.nsa.apiClient.credentials.ApiCredential;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
-
-/**
- * A client for manipulating API keys.
- * @author author
- *
- */
-public interface MRIdentityManager extends MRClient
-{
- /**
- * An API Key record
- */
- public interface ApiKey
- {
- /**
- * Get the email address associated with the API key
- * @return the email address on the API key or null
- */
- String getEmail ();
-
- /**
- * Get the description associated with the API key
- * @return the description on the API key or null
- */
- String getDescription ();
- }
-
- /**
- * Create a new API key on the UEB cluster. The returned credential instance
- * contains the new API key and API secret. This is the only time the secret
- * is available to the client -- there's no API for retrieving it later -- so
- * your application must store it securely.
- *
- * @param email
- * @param description
- * @return a new credential
- * @throws HttpException
- * @throws MRApiException
- * @throws IOException
- */
- ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException;
-
- /**
- * Get basic info about a known API key
- * @param apiKey
- * @return the API key's info or null if it doesn't exist
- * @throws HttpObjectNotFoundException, HttpException, MRApiException
- * @throws IOException
- */
- ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, MRApiException, IOException;
-
- /**
- * Update the record for the API key used to authenticate this request. The UEB
- * API requires that you authenticate with the same key you're updating, so the
- * API key being changed is the one used for setApiCredentials.
- *
- * @param email use null to keep the current value
- * @param description use null to keep the current value
- * @throws IOException
- * @throws HttpException
- * @throws HttpObjectNotFoundException
- */
- void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException;
-
- /**
- * Delete the *current* API key. After this call returns, the API key
- * used to authenticate will no longer be valid.
- *
- * @throws IOException
- * @throws HttpException
- */
- void deleteCurrentApiKey () throws HttpException, IOException;
-}
diff --git a/src/main/java/com/att/nsa/mr/client/MRPublisher.java b/src/main/java/com/att/nsa/mr/client/MRPublisher.java
deleted file mode 100644
index 651c548..0000000
--- a/src/main/java/com/att/nsa/mr/client/MRPublisher.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * A MR publishing interface.
- *
- */
-public interface MRPublisher extends MRClient
-{
- /**
- * A simple message container
- */
- public static class message
- {
- public message ( String partition, String msg )
- {
- fPartition = partition == null ? "" : partition;
- fMsg = msg;
- if ( fMsg == null )
- {
- throw new IllegalArgumentException ( "Can't send a null message." );
- }
- }
-
- public message ( message msg )
- {
- this ( msg.fPartition, msg.fMsg );
- }
-
- public final String fPartition;
- public final String fMsg;
- }
-
- /**
- * Send the given message without partition. partition will be placed at HTTP request level.
- * @param msg message to sent
- * @return the number of pending messages
- * @throws IOException exception
- */
- int send ( String msg ) throws IOException;
- /**
- * Send the given message using the given partition.
- * @param partition partition
- * @param msg message
- * @return the number of pending messages
- * @throws IOException exception
- */
- int send ( String partition, String msg ) throws IOException;
-
- /**
- * Send the given message using its partition.
- * @param msg mesg
- * @return the number of pending messages
- * @throws IOException exp
- */
- int send ( message msg ) throws IOException;
-
- /**
- * Send the given messages using their partitions.
- * @param msgs msg
- * @return the number of pending messages
- * @throws IOException exp
- */
- int send ( Collection<message> msgs ) throws IOException;
-
- /**
- * Close this publisher. It's an error to call send() after close()
- */
- void close ();
-}
diff --git a/src/main/java/com/att/nsa/mr/client/MRTopicManager.java b/src/main/java/com/att/nsa/mr/client/MRTopicManager.java
deleted file mode 100644
index 13524bd..0000000
--- a/src/main/java/com/att/nsa/mr/client/MRTopicManager.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client;
-
-import java.io.IOException;
-import java.util.Set;
-
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
-
-
-/**
- * A client for working with topic metadata.
- * @author author
- */
-public interface MRTopicManager extends MRClient
-{
- /**
- * Get the topics available in the cluster
- * @return a set of topic names
- * @throws IOException
- */
- Set<String> getTopics () throws IOException;
-
- /**
- * Information about a topic.
- */
- public interface TopicInfo
- {
- /**
- * Get the owner of the topic
- * @return the owner, or null if no entry
- */
- String getOwner ();
-
- /**
- * Get the description for this topic
- * @return the description, or null if no entry
- */
- String getDescription ();
-
- /**
- * Get the set of allowed producers (as API keys) on this topic
- * @return the set of allowed producers, null of no ACL exists/enabled
- */
- Set<String> getAllowedProducers ();
-
- /**
- * Get the set of allowed consumers (as API keys) on this topic
- * @return the set of allowed consumers, null of no ACL exists/enabled
- */
- Set<String> getAllowedConsumers ();
- }
-
- /**
- * Get information about a topic.
- * @param topic
- * @return topic information
- * @throws IOException
- * @throws HttpObjectNotFoundException
- */
- TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException;
-
- /**
- * Create a new topic.
- * @param topicName
- * @param topicDescription
- * @param partitionCount
- * @param replicationCount
- * @throws HttpException
- * @throws IOException
- */
- void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException;
-
- /**
- * Delete the topic. This call must be authenticated and the API key listed as owner on the topic.
- * NOTE: The MR (UEB) API server does not support topic deletion at this time (mid 2015)
- * @param topic
- * @throws HttpException
- * @throws IOException
- * @deprecated If/when the Kafka system supports topic delete, or the implementation changes, this will be restored.
- */
- @Deprecated
- void deleteTopic ( String topic ) throws HttpException, IOException;
-
- /**
- * Can any client produce events into this topic without authentication?
- * @param topic
- * @return true if the topic is open for producing
- * @throws IOException
- * @throws HttpObjectNotFoundException
- */
- boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException;
-
- /**
- * Get the set of allowed producers. If the topic is open, the result is null.
- * @param topic
- * @return a set of allowed producers or null
- * @throws IOException
- * @throws HttpObjectNotFoundException
- */
- Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException;
-
- /**
- * Allow the given API key to produce messages on the given topic. The caller must
- * own this topic.
- * @param topic
- * @param apiKey
- * @throws HttpException
- * @throws HttpObjectNotFoundException
- * @throws IOException
- */
- void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException;
-
- /**
- * Revoke the given API key's authorization to produce messages on the given topic.
- * The caller must own this topic.
- * @param topic
- * @param apiKey
- * @throws HttpException
- * @throws IOException
- */
- void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException;
-
- /**
- * Can any client consume events from this topic without authentication?
- * @param topic
- * @return true if the topic is open for consuming
- * @throws IOException
- * @throws HttpObjectNotFoundException
- */
- boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException;
-
- /**
- * Get the set of allowed consumers. If the topic is open, the result is null.
- * @param topic
- * @return a set of allowed consumers or null
- * @throws IOException
- * @throws HttpObjectNotFoundException
- */
- Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException;
-
- /**
- * Allow the given API key to consume messages on the given topic. The caller must
- * own this topic.
- * @param topic
- * @param apiKey
- * @throws HttpException
- * @throws HttpObjectNotFoundException
- * @throws IOException
- */
- void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException;
-
- /**
- * Revoke the given API key's authorization to consume messages on the given topic.
- * The caller must own this topic.
- * @param topic
- * @param apiKey
- * @throws HttpException
- * @throws IOException
- */
- void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException;
-}
-
diff --git a/src/main/java/com/att/nsa/mr/client/impl/Clock.java b/src/main/java/com/att/nsa/mr/client/impl/Clock.java
deleted file mode 100644
index ace791e..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/Clock.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-public class Clock
-{
- public synchronized static Clock getIt ()
- {
- if ( sfClock == null )
- {
- sfClock = new Clock ();
- }
- return sfClock;
- }
-
- /**
- * Get the system's current time in milliseconds.
- * @return the current time
- */
- public static long now ()
- {
- return getIt().nowImpl ();
- }
-
- /**
- * Get current time in milliseconds
- * @return current time in ms
- */
- protected long nowImpl ()
- {
- return System.currentTimeMillis ();
- }
-
- protected Clock ()
- {
- }
-
- private static Clock sfClock = null;
-
- protected synchronized static void register ( Clock testClock )
- {
- sfClock = testClock;
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/DmaapClientUtil.java b/src/main/java/com/att/nsa/mr/client/impl/DmaapClientUtil.java
deleted file mode 100644
index a3a2ce1..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/DmaapClientUtil.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
-
-import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
-
-public class DmaapClientUtil {
-
- private static final String MR_AUTH_CONSTANT = "X-CambriaAuth";
- private static final String MR_DATE_CONSTANT = "X-CambriaDate";
-
- public static WebTarget getTarget(final String path, final String username, final String password) {
-
- Client client = ClientBuilder.newClient();
- HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
- client.register(feature);
-
- return client.target(path);
- }
-
- public static WebTarget getTarget(final String path) {
-
- Client client = ClientBuilder.newClient();
- return client.target(path);
- }
-
- public static Response getResponsewtCambriaAuth(WebTarget target, String username, String password) {
- return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
-
- }
-
- public static Response postResponsewtCambriaAuth(WebTarget target, String username, String password,byte[] data, String contentType) {
- return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).post(Entity.entity(data, contentType));
-
- }
-
- public static Response getResponsewtBasicAuth(WebTarget target, String authHeader) {
-
- return target.request().header("Authorization", "Basic " + authHeader).get();
-
- }
-
- public static Response postResponsewtBasicAuth(WebTarget target, String authHeader,byte[] data,String contentType) {
-
- return target.request().header("Authorization", "Basic " + authHeader).post(Entity.entity(data, contentType));
-
- }
-
- public static Response getResponsewtNoAuth(WebTarget target) {
-
- return target.request().get();
-
- }
-
- public static Response postResponsewtNoAuth(WebTarget target, byte[] data, String contentType) {
- return target.request().post(Entity.entity(data, contentType));
-
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
deleted file mode 100644
index 76bf5ce..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
-
-import org.apache.http.HttpException;
-import org.glassfish.jersey.internal.util.Base64;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiClient.http.CacheUse;
-import com.att.nsa.apiClient.http.HttpClient;
-import com.att.nsa.mr.client.MRClient;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
-public class MRBaseClient extends HttpClient implements MRClient {
-
-
- protected MRBaseClient(Collection<String> hosts) throws MalformedURLException {
- super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort);
-
- fLog = LoggerFactory.getLogger(this.getClass().getName());
- }
-
- protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException {
- super(ConnectionType.HTTP, hosts, stdSvcPort);
-
- fLog = LoggerFactory.getLogger(this.getClass().getName());
- }
-
- protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException {
- super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L,
- TimeUnit.MILLISECONDS, 32, 32, 600000);
-
- fLog = LoggerFactory.getLogger(this.getClass().getName());
- }
-
- @Override
- public void close() {
- }
-
- protected Set<String> jsonArrayToSet(JSONArray a) {
- if (a == null)
- return null;
-
- final TreeSet<String> set = new TreeSet<String>();
- for (int i = 0; i < a.length(); i++) {
- set.add(a.getString(i));
- }
- return set;
- }
-
- public void logTo(Logger log) {
- fLog = log;
- replaceLogger(log);
- }
-
- protected Logger getLog() {
- return fLog;
- }
-
- private Logger fLog;
-
- public JSONObject post(final String path, final byte[] data, final String contentType, final String username,
- final String password, final String protocalFlag) throws HttpException, JSONException {
- if ((null != username && null != password)) {
- WebTarget target=null;
- Response response=null;
- target = DmaapClientUtil.getTarget(path, username, password);
- String encoding = Base64.encodeAsString(username + ":" + password);
-
- response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding,data, contentType);
-
- return getResponseDataInJson(response);
- } else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
- }
- }
-
- public JSONObject postNoAuth(final String path, final byte[] data, String contentType)
- throws HttpException, JSONException {
- WebTarget target = null;
- Response response = null;
- if (contentType == null) {
- contentType = "text/pain";
- }
- target = DmaapClientUtil.getTarget(path);
-
- response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType);
-
- return getResponseDataInJson(response);
- }
-
- public String postWithResponse(final String path, final byte[] data, final String contentType,
- final String username, final String password, final String protocolFlag)
- throws HttpException, JSONException {
- String responseData = null;
- if ((null != username && null != password)) {
- WebTarget target=null;
- Response response=null;
- target = DmaapClientUtil.getTarget(path, username, password);
- String encoding = Base64.encodeAsString(username + ":" + password);
-
- response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding,data, contentType);
-
- responseData = (String)response.readEntity(String.class);
- return responseData;
- } else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
- }
- }
-
- public String postNoAuthWithResponse(final String path, final byte[] data, String contentType)
- throws HttpException, JSONException {
-
- String responseData = null;
- WebTarget target = null;
- Response response = null;
- if (contentType == null) {
- contentType = "text/pain";
- }
- target = DmaapClientUtil.getTarget(path);
-
- response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType);
- responseData = (String) response.readEntity(String.class);
- return responseData;
- }
-
- public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,
- final String authDate, final String username, final String password, final String protocolFlag)
- throws HttpException, JSONException {
- if ((null != username && null != password)) {
- WebTarget target=null;
- Response response=null;
- target = DmaapClientUtil.getTarget(path, username, password);
- response =DmaapClientUtil.postResponsewtCambriaAuth(target, authKey, authDate, data, contentType);
- return getResponseDataInJson(response);
- } else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
- }
- }
-
- public String postAuthwithResponse(final String path, final byte[] data, final String contentType,
- final String authKey, final String authDate, final String username, final String password,
- final String protocolFlag) throws HttpException, JSONException {
- String responseData = null;
- if ((null != username && null != password)) {
- WebTarget target=null;
- Response response=null;
- target = DmaapClientUtil.getTarget(path, username, password);
- response = DmaapClientUtil.postResponsewtCambriaAuth(target, authKey, authDate, data, contentType);
- responseData = (String)response.readEntity(String.class);
- return responseData;
-
- } else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
- }
- }
-
- public JSONObject get(final String path, final String username, final String password, final String protocolFlag)
- throws HttpException, JSONException {
- if (null != username && null != password) {
-
- WebTarget target=null;
- Response response=null;
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- target = DmaapClientUtil.getTarget(path);
- response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password);
- } else {
- target = DmaapClientUtil.getTarget(path, username, password);
- String encoding = Base64.encodeAsString(username + ":" + password);
-
- response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding);
-
- }
- return getResponseDataInJson(response);
- } else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
- }
- }
-
- public String getResponse(final String path, final String username, final String password,
- final String protocolFlag) throws HttpException, JSONException {
- String responseData = null;
- if (null != username && null != password) {
- WebTarget target=null;
- Response response=null;
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- target = DmaapClientUtil.getTarget(path);
- response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password);
- } else {
- target = DmaapClientUtil.getTarget(path, username, password);
- String encoding = Base64.encodeAsString(username + ":" + password);
- response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding);
- }
- MRClientFactory.HTTPHeadersMap = response.getHeaders();
-
- String transactionid = response.getHeaderString("transactionid");
- if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
- }
-
- responseData = (String)response.readEntity(String.class);
- return responseData;
- } else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
- }
- }
-
- public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username,
- final String password, final String protocolFlag) throws HttpException, JSONException {
- if (null != username && null != password) {
- WebTarget target=null;
- Response response=null;
- target = DmaapClientUtil.getTarget(path, username, password);
- response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate);
-
- return getResponseDataInJson(response);
- } else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
- }
- }
-
- public JSONObject getNoAuth(final String path) throws HttpException, JSONException {
-
- WebTarget target = null;
- Response response = null;
- target = DmaapClientUtil.getTarget(path);
- response = DmaapClientUtil.getResponsewtNoAuth(target);
-
- return getResponseDataInJson(response);
- }
-
- public String getAuthResponse(final String path, final String authKey, final String authDate, final String username,
- final String password, final String protocolFlag) throws HttpException, JSONException {
- String responseData = null;
- if (null != username && null != password) {
- WebTarget target=null;
- Response response=null;
- target = DmaapClientUtil.getTarget(path, username, password);
- response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate);
-
- MRClientFactory.HTTPHeadersMap = response.getHeaders();
-
- String transactionid = response.getHeaderString("transactionid");
- if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
- }
-
- responseData = (String)response.readEntity(String.class);
- return responseData;
- } else {
- throw new HttpException(
- "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
- }
- }
-
- public String getNoAuthResponse(String path, final String username, final String password,
- final String protocolFlag) throws HttpException, JSONException {
- String responseData = null;
- WebTarget target=null;
- Response response=null;
- target = DmaapClientUtil.getTarget(path, username, password);
- response = DmaapClientUtil.getResponsewtNoAuth(target);
-
- MRClientFactory.HTTPHeadersMap = response.getHeaders();
-
- String transactionid = response.getHeaderString("transactionid");
- if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
- }
-
- responseData = (String)response.readEntity(String.class);
- return responseData;
-
- }
-
-
- private JSONObject getResponseDataInJson(Response response) throws JSONException {
- try {
- MRClientFactory.HTTPHeadersMap = response.getHeaders();
-
-
- // MultivaluedMap<String, Object> headersMap =
- // for(String key : headersMap.keySet()) {
- String transactionid = response.getHeaderString("transactionid");
- if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info("TransactionId : " + transactionid);
- }
-
-
- if (response.getStatus() == 403) {
- JSONObject jsonObject = null;
- jsonObject = new JSONObject();
- JSONArray jsonArray = new JSONArray();
- jsonArray.put(response.getEntity());
- jsonObject.put("result", jsonArray);
- jsonObject.put("status", response.getStatus());
- return jsonObject;
- }
- String responseData = (String)response.readEntity(String.class);
-
- JSONTokener jsonTokener = new JSONTokener(responseData);
- JSONObject jsonObject = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if ('[' == firstChar) {
- JSONArray jsonArray = new JSONArray(jsonTokener);
- jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
- jsonObject.put("status", response.getStatus());
- } else {
- jsonObject = new JSONObject(jsonTokener);
- jsonObject.put("status", response.getStatus());
- }
-
- return jsonObject;
- } catch (JSONException excp) {
- fLog.error("DMAAP - Error reading response data.", excp);
- return null;
- }
-
- }
-
- public String getHTTPErrorResponseMessage(String responseString) {
-
- String response = null;
- int beginIndex = 0;
- int endIndex = 0;
- if (responseString.contains("<body>")) {
-
- beginIndex = responseString.indexOf("body>") + 5;
- endIndex = responseString.indexOf("</body");
- response = responseString.substring(beginIndex, endIndex);
- }
-
- return response;
-
- }
-
- public String getHTTPErrorResponseCode(String responseString) {
-
- String response = null;
- int beginIndex = 0;
- int endIndex = 0;
- if (responseString.contains("<title>")) {
- beginIndex = responseString.indexOf("title>") + 6;
- endIndex = responseString.indexOf("</title");
- response = responseString.substring(beginIndex, endIndex);
- }
-
- return response;
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRBatchPublisher.java
deleted file mode 100644
index 7d1e396..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/MRBatchPublisher.java
+++ /dev/null
@@ -1,487 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import java.util.zip.GZIPOutputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiClient.http.HttpClient;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.response.MRPublisherResponse;
-
-/**
- * This is a batching publisher class that allows the client to publish messages
- * in batches that are limited in terms of size and/or hold time.
- *
- * @author author
- * @deprecated This class's tricky locking doesn't quite work
- *
- */
-@Deprecated
-public class MRBatchPublisher implements MRBatchingPublisher
-{
- public static final long kMinMaxAgeMs = 1;
-
- /**
- * Create a batch publisher.
- *
- * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
- * @param topic the topic to publish to
- * @param maxBatchSize the maximum size of a batch
- * @param maxAgeMs the maximum age of a batch
- */
- public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
- {
- if ( maxAgeMs < kMinMaxAgeMs )
- {
- fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs );
- maxAgeMs = kMinMaxAgeMs;
- }
-
- try {
- fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException (e);
- }
-
- // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
- // the oldest msg to hit max age? (locking is complicated, but should be do-able)
- fExec = new ScheduledThreadPoolExecutor ( 1 );
- fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS );
- }
-
- @Override
- public void setApiCredentials ( String apiKey, String apiSecret )
- {
- fSender.setApiCredentials ( apiKey, apiSecret );
- }
-
- @Override
- public void clearApiCredentials ()
- {
- fSender.clearApiCredentials ();
- }
-
- /**
- * Send the given message with the given partition
- * @param partition
- * @param msg
- * @throws IOException
- */
- @Override
- public int send ( String partition, String msg ) throws IOException
- {
- return send ( new message ( partition, msg ) );
- }
- @Override
- public int send ( String msg ) throws IOException
- {
- return send ( new message ( "",msg ) );
- }
- /**
- * Send the given message
- * @param userMsg a message
- * @throws IOException
- */
- @Override
- public int send ( message userMsg ) throws IOException
- {
- final LinkedList<message> list = new LinkedList<message> ();
- list.add ( userMsg );
- return send ( list );
- }
-
- /**
- * Send the given set of messages
- * @param msgs the set of messages, sent in order of iteration
- * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
- * @throws IOException
- */
- @Override
- public int send ( Collection<message> msgs ) throws IOException
- {
- if ( msgs.isEmpty() )
- {
- fSender.queue ( msgs );
- }
- return fSender.size ();
- }
-
- @Override
- public int getPendingMessageCount ()
- {
- return fSender.size ();
- }
-
- /**
- * Send any pending messages and close this publisher.
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- public void close ()
- {
- try
- {
- final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
- if ( remains.isEmpty() )
- {
- fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. "
- + "(Consider using the alternate close method to capture unsent messages in this case.)" );
- }
- }
- catch ( InterruptedException e )
- {
- fLog.warn ( "Possible message loss. " + e.getMessage(), e );
- Thread.currentThread().interrupt();
- }
- catch ( IOException e )
- {
- fLog.warn ( "Possible message loss. " + e.getMessage(), e );
- }
- }
-
- public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException
- {
- fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
- fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
- fExec.shutdown ();
-
- final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
- final long timeoutAtMs = System.currentTimeMillis () + waitInMs;
- while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 )
- {
- fSender.checkSend ( true );
- Thread.sleep ( 250 );
- }
-
- final LinkedList<message> result = new LinkedList<message> ();
- fSender.drainTo ( result );
- return result;
- }
-
- private final ScheduledThreadPoolExecutor fExec;
- private final Sender fSender;
-
- private static class TimestampedMessage extends message
- {
- public TimestampedMessage ( message m )
- {
- super ( m );
- timestamp = System.currentTimeMillis ();
- }
- public final long timestamp;
- }
-
- private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class );
-
- private class Sender extends MRBaseClient implements Runnable
- {
- public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException
- {
- super ( baseUrls );
-
- fNextBatch = new LinkedList<TimestampedMessage> ();
- fSendingBatch = null;
- fTopic = topic;
- fMaxBatchSize = maxBatch;
- fMaxAgeMs = maxAgeMs;
- fCompress = compress;
- fLock = new ReentrantReadWriteLock ();
- fWriteLock = fLock.writeLock ();
- fReadLock = fLock.readLock ();
- fDontSendUntilMs = 0;
- }
-
- public void drainTo ( LinkedList<message> list )
- {
- fWriteLock.lock ();
- try
- {
- if ( fSendingBatch != null )
- {
- list.addAll ( fSendingBatch );
- }
- list.addAll ( fNextBatch );
-
- fSendingBatch = null;
- fNextBatch.clear ();
- }
- finally
- {
- fWriteLock.unlock ();
- }
- }
-
- /**
- * Called periodically by the background executor.
- */
- @Override
- public void run ()
- {
- try
- {
- checkSend ( false );
- }
- catch ( IOException e )
- {
- fLog.warn ( "MR background send: " + e.getMessage () );
- fLog.error( "IOException " + e );
- }
- }
-
- public int size ()
- {
- fReadLock.lock ();
- try
- {
- return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () );
- }
- finally
- {
- fReadLock.unlock ();
- }
- }
-
- /**
- * Called to queue a message.
- * @param m
- * @throws IOException
- */
- public void queue ( Collection<message> msgs ) throws IOException
- {
- fWriteLock.lock ();
- try
- {
- for ( message userMsg : msgs )
- {
- if ( userMsg != null )
- {
- fNextBatch.add ( new TimestampedMessage ( userMsg ) );
- }
- else
- {
- fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." );
- }
- }
- }
- finally
- {
- fWriteLock.unlock();
- }
- checkSend ( false );
- }
-
- /**
- * Send a batch if the queue is long enough, or the first pending message is old enough.
- * @param force
- * @throws IOException
- */
- public void checkSend ( boolean force ) throws IOException
- {
- // hold a read lock just long enough to evaluate whether a batch
- // should be sent
- boolean shouldSend = false;
- fReadLock.lock ();
- try
- {
- if ( fNextBatch.isEmpty() )
- {
- final long nowMs = System.currentTimeMillis ();
- shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize );
- if ( !shouldSend )
- {
- final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs;
- shouldSend = sendAtMs <= nowMs;
- }
-
- // however, unless forced, wait after an error
- shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs );
- }
- // else: even in 'force', there's nothing to send, so shouldSend=false is fine
- }
- finally
- {
- fReadLock.unlock ();
- }
-
- // if a send is required, acquire a write lock, swap out the next batch,
- // swap in a fresh batch, and release the lock for the caller to start
- // filling a batch again. After releasing the lock, send the current
- // batch. (There could be more messages added between read unlock and
- // write lock, but that's fine.)
- if ( shouldSend )
- {
- fSendingBatch = null;
-
- fWriteLock.lock ();
- try
- {
- fSendingBatch = fNextBatch;
- fNextBatch = new LinkedList<TimestampedMessage> ();
- }
- finally
- {
- fWriteLock.unlock ();
- }
-
- if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) )
- {
- fLog.warn ( "Send failed, rebuilding send queue." );
-
- // note the time for back-off
- fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis ();
-
- // the send failed. reconstruct the pending queue
- fWriteLock.lock ();
- try
- {
- final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
- fNextBatch = fSendingBatch;
- fNextBatch.addAll ( nextGroup );
- fSendingBatch = null;
- fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." );
- }
- finally
- {
- fWriteLock.unlock ();
- }
- }
- else
- {
- fWriteLock.lock ();
- try
- {
- fSendingBatch = null;
- }
- finally
- {
- fWriteLock.unlock ();
- }
- }
- }
- }
-
- private LinkedList<TimestampedMessage> fNextBatch;
- private LinkedList<TimestampedMessage> fSendingBatch;
- private final String fTopic;
- private final int fMaxBatchSize;
- private final long fMaxAgeMs;
- private final boolean fCompress;
- private final ReentrantReadWriteLock fLock;
- private final WriteLock fWriteLock;
- private final ReadLock fReadLock;
- private long fDontSendUntilMs;
- private static final long sfWaitAfterError = 1000;
- }
-
- // this is static so that it's clearly not using any mutable member data outside of a lock
- private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log )
- {
- // it's possible for this call to be made with an empty list. in this case, just return.
- if ( toSend.isEmpty() )
- {
- return true;
- }
-
- final long nowMs = System.currentTimeMillis ();
- final String url = MRConstants.makeUrl ( topic );
-
- log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms" );
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
- try
- {
- OutputStream os = baseStream;
- if ( compress )
- {
- os = new GZIPOutputStream ( baseStream );
- }
- for ( TimestampedMessage m : toSend )
- {
- os.write ( ( "" + m.fPartition.length () ).getBytes() );
- os.write ( '.' );
- os.write ( ( "" + m.fMsg.length () ).getBytes() );
- os.write ( '.' );
- os.write ( m.fPartition.getBytes() );
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- os.close ();
- }
- catch ( IOException e )
- {
- log.warn ( "Problem writing stream to post: " + e.getMessage (),e );
- return false;
- }
-
- boolean result = false;
- final long startMs = System.currentTimeMillis ();
- try
- {
- client.post ( url, compress ?
- MRFormat.CAMBRIA_ZIP.toString () :
- MRFormat.CAMBRIA.toString (),
- baseStream.toByteArray(), false );
- result = true;
- }
- catch ( HttpException e )
- {
- log.warn ( "Problem posting to MR: " + e.getMessage(),e );
- }
- catch ( IOException e )
- {
- log.warn ( "Problem posting to MR: " + e.getMessage(),e );
- }
-
- log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" );
- return result;
- }
-
- @Override
- public void logTo ( Logger log )
- {
- fLog = log;
- }
-
- @Override
- public MRPublisherResponse sendBatchWithResponse() {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRClientVersionInfo.java b/src/main/java/com/att/nsa/mr/client/impl/MRClientVersionInfo.java
deleted file mode 100644
index bb6299d..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/MRClientVersionInfo.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MRClientVersionInfo
-{
- private static final Logger logger = LoggerFactory.getLogger(MRClientVersionInfo.class);
- public static String getVersion ()
- {
- return version;
- }
-
- private static final Properties props = new Properties();
- private static final String version;
- static
- {
- String use = null;
- try
- {
- final InputStream is = MRClientVersionInfo.class.getResourceAsStream ( "/MRClientVersion.properties" );
- if ( is != null )
- {
- props.load ( is );
- use = props.getProperty ( "MRClientVersion", null );
- }
- }
- catch ( IOException e )
- {
- logger.error("exception: ", e);
- }
- version = use;
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConstants.java b/src/main/java/com/att/nsa/mr/client/impl/MRConstants.java
deleted file mode 100644
index cb0fc31..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/MRConstants.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.http.HttpHost;
-
-class MRConstants
-{
- private static final String PROTOCOL = "http";
- public static final String context = "/";
- public static final String kBasePath = "events/";
- public static final int kStdMRServicePort = 8080;
-
- public static String escape ( String s )
- {
- try
- {
- return URLEncoder.encode ( s, "UTF-8");
- }
- catch ( UnsupportedEncodingException e )
- {
- throw new IllegalArgumentException(e);
- }
- }
-
- public static String makeUrl ( String rawTopic )
- {
- final String cleanTopic = escape ( rawTopic );
-
- final StringBuffer url = new StringBuffer().
- append ( MRConstants.context ).
- append ( MRConstants.kBasePath ).
- append ( cleanTopic );
- return url.toString ();
- }
-
- public static String makeUrl ( final String host, final String rawTopic )
- {
- final String cleanTopic = escape ( rawTopic );
-
- final StringBuffer url = new StringBuffer();
-
- if (!host.startsWith("http") || !host.startsWith("https") ) {
- url.append( PROTOCOL + "://" );
- }
- url.append(host);
- url.append ( MRConstants.context );
- url.append ( MRConstants.kBasePath );
- url.append ( cleanTopic );
- return url.toString ();
- }
-
- public static String makeUrl ( final String host, final String rawTopic, final String transferprotocol,final String parttion )
- {
- final String cleanTopic = escape ( rawTopic );
-
- final StringBuffer url = new StringBuffer();
-
- if (transferprotocol !=null && !transferprotocol.equals("")) {
- url.append( transferprotocol + "://" );
- }else{
- url.append( PROTOCOL + "://" );
- }
- url.append(host);
- url.append ( MRConstants.context );
- url.append ( MRConstants.kBasePath );
- url.append ( cleanTopic );
- if(parttion!=null && !parttion.equalsIgnoreCase(""))
- url.append("?partitionKey=").append(parttion);
- return url.toString ();
- }
- public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId )
- {
- final String cleanConsumerGroup = escape ( rawConsumerGroup );
- final String cleanConsumerId = escape ( rawConsumerId );
- return MRConstants.context + MRConstants.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId;
- }
-
- /**
- * Create a list of HttpHosts from an input list of strings. Input strings have
- * host[:port] as format. If the port section is not provided, the default port is used.
- *
- * @param hosts
- * @return a list of hosts
- */
- public static List<HttpHost> createHostsList(Collection<String> hosts)
- {
- final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> ();
- for ( String host : hosts )
- {
- if ( host.length () == 0 ) continue;
- convertedHosts.add ( hostForString ( host ) );
- }
- return convertedHosts;
- }
-
- /**
- * Return an HttpHost from an input string. Input string has
- * host[:port] as format. If the port section is not provided, the default port is used.
- *
- * @param hosts
- * @return a list of hosts
- */
- public static HttpHost hostForString ( String host )
- {
- if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." );
-
- String hostPart = host;
- int port = kStdMRServicePort;
-
- final int colon = host.indexOf ( ':' );
- if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." );
- if ( colon > 0 )
- {
- hostPart = host.substring ( 0, colon ).trim();
-
- final String portPart = host.substring ( colon + 1 ).trim();
- if ( portPart.length () > 0 )
- {
- try
- {
- port = Integer.parseInt ( portPart );
- }
- catch ( NumberFormatException x )
- {
- throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x );
- }
- }
- // else: use default port on "foo:"
- }
-
- return new HttpHost ( hostPart, port );
- }
-
- public static String makeConsumerUrl(String host, String fTopic, String fGroup, String fId,final String transferprotocol) {
- final String cleanConsumerGroup = escape ( fGroup );
- final String cleanConsumerId = escape ( fId );
-
- StringBuffer url = new StringBuffer();
-
- if (transferprotocol !=null && !transferprotocol.equals("")) {
- url.append( transferprotocol + "://" );
- }else{
- url.append( PROTOCOL + "://" );
- }
-
- url.append(host);
- url.append(context);
- url.append(kBasePath);
- url.append(fTopic + "/" + cleanConsumerGroup + "/" + cleanConsumerId);
-
- return url.toString();
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
deleted file mode 100644
index bc156ea..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
+++ /dev/null
@@ -1,675 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-import com.att.aft.dme2.api.DME2Client;
-import com.att.aft.dme2.api.DME2Exception;
-import com.att.nsa.mr.client.HostSelector;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import org.apache.http.HttpException;
-import org.apache.http.HttpStatus;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
-
- private Logger log = LoggerFactory.getLogger(this.getClass().getName());
-
- public static final String routerFilePath = null;
-
- public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
- public String consumerFilePath;
-
- private static final String SUCCESS_MESSAGE = "Success";
- private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
- private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
-
- private final String fTopic;
- private final String fGroup;
- private final String fId;
- private final int fTimeoutMs;
- private final int fLimit;
- private String fFilter;
- private String username;
- private String password;
- private String host;
- private HostSelector fHostSelector = null;
- private String url;
- private DME2Client sender;
- private String authKey;
- private String authDate;
- private Properties props;
- private HashMap<String, String> DMETimeOuts;
- private long dme2ReplyHandlerTimeoutMs;
- private long longPollingMs;
-
- public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
- String apiSecret_password) throws MalformedURLException {
- this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
- false);
- }
-
- public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
- boolean allowSelfSignedCerts) throws MalformedURLException {
- super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
-
- fTopic = topic;
- fGroup = consumerGroup;
- fId = consumerId;
- fTimeoutMs = timeoutMs;
- fLimit = limit;
- fFilter = filter;
-
- fHostSelector = new HostSelector(hostPart);
- }
-
- @Override
- public Iterable<String> fetch() throws IOException, Exception {
- // fetch with the timeout and limit set in constructor
- return fetch(fTimeoutMs, fLimit);
- }
-
- @Override
- public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
- final LinkedList<String> msgs = new LinkedList<>();
-
- try {
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
- dmeConfigure(timeoutMs, limit);
- try {
- String reply = sender.sendAndWait(timeoutMs + 10000L);
- final JSONObject o = getResponseDataInJson(reply);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
-
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- try {
- final JSONObject o = get(urlPath, username, password, protocolFlag);
-
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
- timeoutMs, limit);
-
- try {
- final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
-
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- try {
- final JSONObject o = getNoAuth(urlPath);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- } catch (Exception e) {
- throw e;
- }
-
- return msgs;
- }
-
- @Override
- public MRConsumerResponse fetchWithReturnConsumerResponse() {
- // fetch with the timeout and limit set in constructor
- return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
- }
-
- @Override
- public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
- final LinkedList<String> msgs = new LinkedList<String>();
- MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
- try {
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
- dmeConfigure(timeoutMs, limit);
-
- long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
- : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
- String reply = sender.sendAndWait(timeout);
-
- final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
-
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- createMRConsumerResponse(reply, mrConsumerResponse);
- }
-
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- String response = getResponse(urlPath, username, password, protocolFlag);
- final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- createMRConsumerResponse(response, mrConsumerResponse);
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
- timeoutMs, limit);
-
- String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
- final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- createMRConsumerResponse(response, mrConsumerResponse);
- }
-
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
- final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- createMRConsumerResponse(response, mrConsumerResponse);
- }
-
- } catch (JSONException e) {
- mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("json exception: ", e);
- } catch (HttpException e) {
- mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("http exception: ", e);
- } catch (DME2Exception e) {
- mrConsumerResponse.setResponseCode(e.getErrorCode());
- mrConsumerResponse.setResponseMessage(e.getErrorMessage());
- log.error("DME2 exception: ", e);
- } catch (Exception e) {
- mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("exception: ", e);
- }
- mrConsumerResponse.setActualMessages(msgs);
- return mrConsumerResponse;
- }
-
- @Override
- protected void reportProblemWithResponse() {
- log.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
- super.reportProblemWithResponse();
- fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
- }
-
- private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
- if (reply.startsWith("{")) {
- JSONObject jObject = new JSONObject(reply);
- String message = jObject.getString("message");
- int status = jObject.getInt("status");
-
- mrConsumerResponse.setResponseCode(Integer.toString(status));
-
- if (null != message) {
- mrConsumerResponse.setResponseMessage(message);
- }
- } else if (reply.startsWith("<")) {
- mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
- mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
- } else {
- mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
- mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
- }
- }
-
- private JSONObject getResponseDataInJson(String response) {
- try {
- JSONTokener jsonTokener = new JSONTokener(response);
- JSONObject jsonObject = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if ('[' == firstChar) {
- JSONArray jsonArray = new JSONArray(jsonTokener);
- jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
- } else {
- jsonObject = new JSONObject(jsonTokener);
- }
-
- return jsonObject;
- } catch (JSONException excp) {
- log.error("DMAAP - Error reading response data.", excp);
- return null;
- }
- }
-
- private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
- JSONTokener jsonTokener = new JSONTokener(response);
- JSONObject jsonObject = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if (null != response && response.length() == 0) {
- return null;
- }
-
- if ('[' == firstChar) {
- JSONArray jsonArray = new JSONArray(jsonTokener);
- jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
- } else if ('{' == firstChar) {
- return null;
- } else if ('<' == firstChar) {
- return null;
- } else {
- jsonObject = new JSONObject(jsonTokener);
- }
-
- return jsonObject;
- }
-
- private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
- this.longPollingMs = timeoutMs;
- String latitude = props.getProperty("Latitude");
- String longitude = props.getProperty("Longitude");
- String version = props.getProperty("Version");
- String serviceName = props.getProperty("ServiceName");
- String env = props.getProperty("Environment");
- String partner = props.getProperty("Partner");
- String routeOffer = props.getProperty("routeOffer");
- String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
- String protocol = props.getProperty("Protocol");
- String methodType = props.getProperty("MethodType");
- String dmeuser = props.getProperty("username");
- String dmepassword = props.getProperty("password");
- String contenttype = props.getProperty("contenttype");
- String handlers = props.getProperty("sessionstickinessrequired");
-
- /**
- * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
- * provided use the routeOffer value for auto failover within a cluster
- */
-
- String preferredRouteKey = readRoute("preferredRouteKey");
-
- if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
- + "&routeoffer=" + preferredRouteKey;
- } else if (partner != null && !partner.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
- } else if (routeOffer != null && !routeOffer.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
- + routeOffer;
- }
-
- if (timeoutMs != -1)
- url = url + "&timeout=" + timeoutMs;
- if (limit != -1)
- url = url + "&limit=" + limit;
-
- // Add filter to DME2 Url
- if (fFilter != null && fFilter.length() > 0)
- url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
-
- DMETimeOuts = new HashMap<>();
- DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
- DMETimeOuts.put("Content-Type", contenttype);
- System.setProperty("AFT_LATITUDE", latitude);
- System.setProperty("AFT_LONGITUDE", longitude);
- System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
-
- // SSL changes
- System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
- System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
- System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
- // SSL changes
-
- long dme2PerEndPointTimeoutMs;
- try {
- dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
- // backward compatibility
- if (dme2PerEndPointTimeoutMs <= 0) {
- dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
- }
- } catch (NumberFormatException nfe) {
- // backward compatibility
- dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
- getLog().debug(
- "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
- }
-
- try {
- dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
- } catch (NumberFormatException nfe) {
- try {
- long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
- long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
- dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
- getLog().debug(
- "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
- + dme2ReplyHandlerTimeoutMs);
- } catch (NumberFormatException e) {
- // backward compatibility
- dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
- getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
- }
- }
- // backward compatibility
- if (dme2ReplyHandlerTimeoutMs <= 0) {
- dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
- }
-
- sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
- sender.setAllowAllHttpReturnCodes(true);
- sender.setMethod(methodType);
- sender.setSubContext(subContextPath);
- if (dmeuser != null && dmepassword != null) {
- sender.setCredentials(dmeuser, dmepassword);
- }
- sender.setHeaders(DMETimeOuts);
- sender.setPayload("");
- if (handlers != null && handlers.equalsIgnoreCase("yes")) {
- sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
- props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
- sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
- } else {
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
- }
- }
-
- protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
- final StringBuilder contexturl = new StringBuilder(url);
- final StringBuilder adds = new StringBuilder();
-
- if (timeoutMs > -1) {
- adds.append("timeout=").append(timeoutMs);
- }
-
- if (limit > -1) {
- if (adds.length() > 0) {
- adds.append("&");
- }
- adds.append("limit=").append(limit);
- }
-
- if (fFilter != null && fFilter.length() > 0) {
- try {
- if (adds.length() > 0) {
- adds.append("&");
- }
- adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- log.error("exception at createUrlPath () : ", e);
- }
- }
-
- if (adds.length() > 0) {
- contexturl.append("?").append(adds.toString());
- }
-
- return contexturl.toString();
- }
-
- private String readRoute(String routeKey) {
- try {
- MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
- } catch (Exception ex) {
- log.error("Reply Router Error " + ex);
- }
- return MRClientFactory.prop.getProperty(routeKey);
- }
-
- public static List<String> stringToList(String str) {
- final LinkedList<String> set = new LinkedList<>();
- if (str != null) {
- final String[] parts = str.trim().split(",");
- for (String part : parts) {
- final String trimmed = part.trim();
- if (trimmed.length() > 0) {
- set.add(trimmed);
- }
- }
- }
- return set;
- }
-
- public static String getRouterFilePath() {
- return routerFilePath;
- }
-
- public static void setRouterFilePath(String routerFilePath) {
- MRSimplerBatchPublisher.routerFilePath = routerFilePath;
- }
-
- public String getConsumerFilePath() {
- return consumerFilePath;
- }
-
- public void setConsumerFilePath(String consumerFilePath) {
- this.consumerFilePath = consumerFilePath;
- }
-
- public String getProtocolFlag() {
- return protocolFlag;
- }
-
- public void setProtocolFlag(String protocolFlag) {
- this.protocolFlag = protocolFlag;
- }
-
- public Properties getProps() {
- return props;
- }
-
- public void setProps(Properties props) {
- this.props = props;
- }
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public String getAuthKey() {
- return authKey;
- }
-
- public void setAuthKey(String authKey) {
- this.authKey = authKey;
- }
-
- public String getAuthDate() {
- return authDate;
- }
-
- public void setAuthDate(String authDate) {
- this.authDate = authDate;
- }
-
- public String getfFilter() {
- return fFilter;
- }
-
- public void setfFilter(String fFilter) {
- this.fFilter = fFilter;
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRFormat.java b/src/main/java/com/att/nsa/mr/client/impl/MRFormat.java
deleted file mode 100644
index 1028adf..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/MRFormat.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-enum MRFormat
-{
- /**
- * Messages are sent using MR's message format.
- */
- CAMBRIA
- {
- @Override
- public String toString() { return "application/cambria"; }
- },
-
- /**
- * Messages are sent using MR's message format with compression.
- */
- CAMBRIA_ZIP
- {
- @Override
- public String toString() { return "application/cambria-zip"; }
- },
-
- /**
- * messages are sent as simple JSON objects.
- */
- JSON
- {
- @Override
- public String toString() { return "application/json"; }
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRMetaClient.java b/src/main/java/com/att/nsa/mr/client/impl/MRMetaClient.java
deleted file mode 100644
index 609540b..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/MRMetaClient.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiClient.credentials.ApiCredential;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
-import com.att.nsa.mr.client.MRIdentityManager;
-import com.att.nsa.mr.client.MRTopicManager;
-
-public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager
-{
- private static final Logger logger = LoggerFactory.getLogger(MRMetaClient.class);
- public MRMetaClient ( Collection<String> baseUrls ) throws MalformedURLException
- {
- super ( baseUrls );
- }
-
- @Override
- public Set<String> getTopics () throws IOException
- {
- final TreeSet<String> set = new TreeSet<String> ();
- try
- {
- final JSONObject topicSet = get ( "/topics" );
- final JSONArray a = topicSet.getJSONArray ( "topics" );
- for ( int i=0; i<a.length (); i++ )
- {
- set.add ( a.getString ( i ) );
- }
- }
- catch ( HttpObjectNotFoundException e )
- {
- getLog().warn ( "No /topics endpoint on service." );
- logger.error("HttpObjectNotFoundException: ", e);
- }
- catch ( JSONException e )
- {
- getLog().warn ( "Bad /topics result from service." );
- logger.error("JSONException: ", e);
- }
- catch ( HttpException e )
- {
- throw new IOException ( e );
- }
- return set;
- }
-
- @Override
- public TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- try
- {
- final JSONObject topicData = get ( "/topics/" + MRConstants.escape ( topic ) );
- return new TopicInfo ()
- {
- @Override
- public String getOwner ()
- {
- return topicData.optString ( "owner", null );
- }
-
- @Override
- public String getDescription ()
- {
- return topicData.optString ( "description", null );
- }
-
- @Override
- public Set<String> getAllowedProducers ()
- {
- final JSONObject acl = topicData.optJSONObject ( "writerAcl" );
- if ( acl != null && acl.optBoolean ( "enabled", true ) )
- {
- return jsonArrayToSet ( acl.optJSONArray ( "users" ) );
- }
- return null;
- }
-
- @Override
- public Set<String> getAllowedConsumers ()
- {
- final JSONObject acl = topicData.optJSONObject ( "readerAcl" );
- if ( acl != null && acl.optBoolean ( "enabled", true ) )
- {
- return jsonArrayToSet ( acl.optJSONArray ( "users" ) );
- }
- return null;
- }
- };
- }
- catch ( JSONException e )
- {
- throw new IOException ( e );
- }
- catch ( HttpException e )
- {
- throw new IOException ( e );
- }
- }
-
- @Override
- public void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException
- {
- final JSONObject o = new JSONObject ();
- o.put ( "topicName", topicName );
- o.put ( "topicDescription", topicDescription );
- o.put ( "partitionCount", partitionCount );
- o.put ( "replicationCount", replicationCount );
- post ( "/topics/create", o, false );
- }
-
- @Override
- public void deleteTopic ( String topic ) throws HttpException, IOException
- {
- delete ( "/topics/" + MRConstants.escape ( topic ) );
- }
-
- @Override
- public boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- return null == getAllowedProducers ( topic );
- }
-
- @Override
- public Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- return getTopicMetadata ( topic ).getAllowedProducers ();
- }
-
- @Override
- public void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
- {
- put ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() );
- }
-
- @Override
- public void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException
- {
- delete ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) );
- }
-
- @Override
- public boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- return null == getAllowedConsumers ( topic );
- }
-
- @Override
- public Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- return getTopicMetadata ( topic ).getAllowedConsumers ();
- }
-
- @Override
- public void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
- {
- put ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() );
- }
-
- @Override
- public void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException
- {
- delete ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) );
- }
-
- @Override
- public ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException
- {
- try
- {
- final JSONObject o = new JSONObject ();
- o.put ( "email", email );
- o.put ( "description", description );
- final JSONObject reply = post ( "/apiKeys/create", o, true );
- return new ApiCredential ( reply.getString ( "key" ), reply.getString ( "secret" ) );
- }
- catch ( JSONException e )
- {
- // the response doesn't meet our expectation
- throw new MRApiException ( "The API key response is incomplete.", e );
- }
- }
-
- @Override
- public ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
- {
- final JSONObject keyEntry = get ( "/apiKeys/" + MRConstants.escape ( apiKey ) );
- if ( keyEntry == null )
- {
- return null;
- }
-
- return new ApiKey ()
- {
- @Override
- public String getEmail ()
- {
- final JSONObject aux = keyEntry.optJSONObject ( "aux" );
- if ( aux != null )
- {
- return aux.optString ( "email" );
- }
- return null;
- }
-
- @Override
- public String getDescription ()
- {
- final JSONObject aux = keyEntry.optJSONObject ( "aux" );
- if ( aux != null )
- {
- return aux.optString ( "description" );
- }
- return null;
- }
- };
- }
-
- @Override
- public void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException
- {
- final JSONObject o = new JSONObject ();
- if ( email != null ) o.put ( "email", email );
- if ( description != null ) o.put ( "description", description );
- patch ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ), o );
- }
-
- @Override
- public void deleteCurrentApiKey () throws HttpException, IOException
- {
- delete ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ) );
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
deleted file mode 100644
index 4f44d30..0000000
--- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
+++ /dev/null
@@ -1,927 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.zip.GZIPOutputStream;
-
-import javax.ws.rs.core.MultivaluedMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.http.HttpException;
-import org.apache.http.HttpStatus;
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-
-import com.att.aft.dme2.api.DME2Client;
-import com.att.aft.dme2.api.DME2Exception;
-import com.att.nsa.mr.client.HostSelector;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.response.MRPublisherResponse;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
-public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
- private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
-
- public static class Builder {
- public Builder() {
- }
-
- public Builder againstUrls(Collection<String> baseUrls) {
- fUrls = baseUrls;
- return this;
- }
-
- public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )
- {
- fUrls = baseUrls;
- fServiceName = serviceName;
- fTransportype = transportype;
- return this;
- }
-
- public Builder onTopic(String topic) {
- fTopic = topic;
- return this;
- }
-
- public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- return this;
- }
-
- public Builder compress(boolean compress) {
- fCompress = compress;
- return this;
- }
-
- public Builder httpThreadTime(int threadOccuranceTime) {
- this.threadOccuranceTime = threadOccuranceTime;
- return this;
- }
-
- public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
- fAllowSelfSignedCerts = allowSelfSignedCerts;
- return this;
- }
-
- public Builder withResponse(boolean withResponse) {
- fWithResponse = withResponse;
- return this;
- }
-
- public MRSimplerBatchPublisher build() {
- if (!fWithResponse) {
- try {
- return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
- fAllowSelfSignedCerts, threadOccuranceTime);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- } else {
- try {
- return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
- fAllowSelfSignedCerts, fMaxBatchSize);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- }
-
- private Collection<String> fUrls;
- private Collection<String> fServiceName;
- private String fTransportype;
- private String fTopic;
- private int fMaxBatchSize = 100;
- private long fMaxBatchAgeMs = 1000;
- private boolean fCompress = false;
- private int threadOccuranceTime = 50;
- private boolean fAllowSelfSignedCerts = false;
- private boolean fWithResponse = false;
-
- };
-
- @Override
- public int send(String partition, String msg) {
- return send(new message(partition, msg));
- }
-
- @Override
- public int send(String msg) {
- return send(new message(null, msg));
- }
-
- @Override
- public int send(message msg) {
- final LinkedList<message> list = new LinkedList<message>();
- list.add(msg);
- return send(list);
- }
-
- @Override
- public synchronized int send(Collection<message> msgs) {
- if (fClosed) {
- throw new IllegalStateException("The publisher was closed.");
- }
-
- for (message userMsg : msgs) {
- fPending.add(new TimestampedMessage(userMsg));
- }
- return getPendingMessageCount();
- }
-
- @Override
- public synchronized int getPendingMessageCount() {
- return fPending.size();
- }
-
- @Override
- public void close() {
- try {
- final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (remains.isEmpty()) {
- getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
- + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
- }
- } catch (InterruptedException e) {
- getLog().warn("Possible message loss. " + e.getMessage(), e);
- Thread.currentThread().interrupt();
- } catch (IOException e) {
- getLog().warn("Possible message loss. " + e.getMessage(), e);
- }
- }
-
- @Override
- public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
- synchronized (this) {
- fClosed = true;
-
- // stop the background sender
- fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- fExec.shutdown();
- }
-
- final long now = Clock.now();
- final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
- final long timeoutAtMs = now + waitInMs;
-
- while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
- send(true);
- Thread.sleep(250);
- }
-
- synchronized (this) {
- final LinkedList<message> result = new LinkedList<message>();
- fPending.drainTo(result);
- return result;
- }
- }
-
- /**
- * Possibly send a batch to the MR server. This is called by the background
- * thread and the close() method
- *
- * @param force
- */
- private synchronized void send(boolean force) {
- if (force || shouldSendNow()) {
- if (!sendBatch()) {
- getLog().warn("Send failed, " + fPending.size() + " message to send.");
-
- // note the time for back-off
- fDontSendUntilMs = sfWaitAfterError + Clock.now();
- }
- }
- }
-
- private synchronized boolean shouldSendNow() {
- boolean shouldSend = false;
- if (fPending.isEmpty()) {
- final long nowMs = Clock.now();
-
- shouldSend = (fPending.size() >= fMaxBatchSize);
- if (!shouldSend) {
- final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
- shouldSend = sendAtMs <= nowMs;
- }
-
- // however, wait after an error
- shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
- }
- return shouldSend;
- }
-
- /**
- * Method to parse published JSON Objects and Arrays
- *
- * @return JSONArray
- */
- private JSONArray parseJSON() {
- JSONArray jsonArray = new JSONArray();
- for (TimestampedMessage m : fPending) {
- JSONTokener jsonTokener = new JSONTokener(m.fMsg);
- JSONObject jsonObject = null;
- JSONArray tempjsonArray = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if ('[' == firstChar) {
- tempjsonArray = new JSONArray(jsonTokener);
- if (null != tempjsonArray) {
- for (int i = 0; i < tempjsonArray.length(); i++) {
- jsonArray.put(tempjsonArray.getJSONObject(i));
- }
- }
- } else {
- jsonObject = new JSONObject(jsonTokener);
- jsonArray.put(jsonObject);
- }
-
- }
- return jsonArray;
- }
-
- private synchronized boolean sendBatch() {
- // it's possible for this call to be made with an empty list. in this
- // case, just return.
- if (fPending.size() < 1) {
- return true;
- }
-
- final long nowMs = Clock.now();
-
- if (this.fHostSelector != null) {
- host = this.fHostSelector.selectBaseHost();
- }
-
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
- props.getProperty("partition"));
-
- try {
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
- OutputStream os = baseStream;
- final String contentType = props.getProperty("contenttype");
- if (contentType.equalsIgnoreCase("application/json")) {
- JSONArray jsonArray = parseJSON();
- os.write(jsonArray.toString().getBytes());
- os.close();
-
- } else if (contentType.equalsIgnoreCase("text/plain")) {
- for (TimestampedMessage m : fPending) {
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- os.close();
- } else if (contentType.equalsIgnoreCase("application/cambria")
- || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
- if (contentType.equalsIgnoreCase("application/cambria-zip")) {
- os = new GZIPOutputStream(baseStream);
- }
- for (TimestampedMessage m : fPending) {
-
- os.write(("" + m.fPartition.length()).getBytes());
- os.write('.');
- os.write(("" + m.fMsg.length()).getBytes());
- os.write('.');
- os.write(m.fPartition.getBytes());
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- os.close();
- } else {
- for (TimestampedMessage m : fPending) {
- os.write(m.fMsg.getBytes());
-
- }
- os.close();
- }
-
- final long startMs = Clock.now();
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-
- DME2Configue();
-
- Thread.sleep(5);
- getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
- sender.setPayload(os.toString());
- String dmeResponse = sender.sendAndWait(5000L);
-
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
- getLog().info(logLine);
- fPending.clear();
- return true;
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
- final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
- username, password, protocolFlag);
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
- return false;
- }
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
- fPending.clear();
- return true;
- }
-
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
- final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
- protocolFlag);
-
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
- return false;
- }
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
- fPending.clear();
- return true;
- }
-
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
- final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
-
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
- return false;
- }
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
- fPending.clear();
- return true;
- }
- } catch (IllegalArgumentException x) {
- getLog().warn(x.getMessage(), x);
- } catch (IOException x) {
- getLog().warn(x.getMessage(), x);
- } catch (HttpException x) {
- getLog().warn(x.getMessage(), x);
- } catch (Exception x) {
- getLog().warn(x.getMessage(), x);
- }
- return false;
- }
-
- public synchronized MRPublisherResponse sendBatchWithResponse() {
- // it's possible for this call to be made with an empty list. in this
- // case, just return.
- if (fPending.isEmpty()) {
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- pubResponse.setResponseMessage("No Messages to send");
- return pubResponse;
- }
-
- final long nowMs = Clock.now();
-
- host = this.fHostSelector.selectBaseHost();
-
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
- props.getProperty("partition"));
- OutputStream os = null;
- try {
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
- os = baseStream;
- final String contentType = props.getProperty("contenttype");
- if (contentType.equalsIgnoreCase("application/json")) {
- JSONArray jsonArray = parseJSON();
- os.write(jsonArray.toString().getBytes());
- } else if (contentType.equalsIgnoreCase("text/plain")) {
- for (TimestampedMessage m : fPending) {
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- } else if (contentType.equalsIgnoreCase("application/cambria")
- || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
- if (contentType.equalsIgnoreCase("application/cambria-zip")) {
- os = new GZIPOutputStream(baseStream);
- }
- for (TimestampedMessage m : fPending) {
-
- os.write(("" + m.fPartition.length()).getBytes());
- os.write('.');
- os.write(("" + m.fMsg.length()).getBytes());
- os.write('.');
- os.write(m.fPartition.getBytes());
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- os.close();
- } else {
- for (TimestampedMessage m : fPending) {
- os.write(m.fMsg.getBytes());
-
- }
- }
-
- final long startMs = Clock.now();
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-
- try {
- DME2Configue();
-
- Thread.sleep(5);
- getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
- sender.setPayload(os.toString());
-
- String dmeResponse = sender.sendAndWait(5000L);
-
- pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
-
- if (Integer.valueOf(pubResponse.getResponseCode()) < 200
- || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
- final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
- getLog().info(logLine);
- fPending.clear();
-
- } catch (DME2Exception x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(x.getErrorCode());
- pubResponse.setResponseMessage(x.getErrorMessage());
- } catch (URISyntaxException x) {
-
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- pubResponse.setResponseMessage(x.getMessage());
- } catch (Exception x) {
-
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- pubResponse.setResponseMessage(x.getMessage());
- logger.error("exception: ", x);
-
- }
-
- return pubResponse;
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
- final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
- authDate, username, password, protocolFlag);
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
-
- pubResponse = createMRPublisherResponse(result, pubResponse);
-
- if (Integer.valueOf(pubResponse.getResponseCode()) < 200
- || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
-
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
- fPending.clear();
- return pubResponse;
- }
-
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
- final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
- password, protocolFlag);
-
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- pubResponse = createMRPublisherResponse(result, pubResponse);
-
- if (Integer.valueOf(pubResponse.getResponseCode()) < 200
- || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
-
- final String logLine = String.valueOf((Clock.now() - startMs));
- getLog().info(logLine);
- fPending.clear();
- return pubResponse;
- }
-
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
- final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
-
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- pubResponse = createMRPublisherResponse(result, pubResponse);
-
- if (Integer.valueOf(pubResponse.getResponseCode()) < 200
- || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
-
- final String logLine = String.valueOf((Clock.now() - startMs));
- getLog().info(logLine);
- fPending.clear();
- return pubResponse;
- }
- } catch (IllegalArgumentException x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- pubResponse.setResponseMessage(x.getMessage());
-
- } catch (IOException x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- pubResponse.setResponseMessage(x.getMessage());
-
- } catch (HttpException x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- pubResponse.setResponseMessage(x.getMessage());
-
- } catch (Exception x) {
- getLog().warn(x.getMessage(), x);
-
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- pubResponse.setResponseMessage(x.getMessage());
-
- }
-
- finally {
- if (fPending.size() > 0) {
- getLog().warn("Send failed, " + fPending.size() + " message to send.");
- pubResponse.setPendingMsgs(fPending.size());
- }
- if (os != null) {
- try {
- os.close();
- } catch (Exception x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- pubResponse.setResponseMessage("Error in closing Output Stream");
- }
- }
- }
-
- return pubResponse;
- }
-
- public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
-
- if (reply.isEmpty()) {
-
- mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- mrPubResponse.setResponseMessage("Please verify the Producer properties");
- } else if (reply.startsWith("{")) {
- JSONObject jObject = new JSONObject(reply);
- if (jObject.has("message") && jObject.has("status")) {
- String message = jObject.getString("message");
- if (null != message) {
- mrPubResponse.setResponseMessage(message);
- }
- mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
- } else {
- mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
- mrPubResponse.setResponseMessage(reply);
- }
- } else if (reply.startsWith("<")) {
- String responseCode = getHTTPErrorResponseCode(reply);
- if (responseCode.contains("403")) {
- responseCode = "403";
- }
- mrPubResponse.setResponseCode(responseCode);
- mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
- }
-
- return mrPubResponse;
- }
-
- private final String fTopic;
- private final int fMaxBatchSize;
- private final long fMaxBatchAgeMs;
- private final boolean fCompress;
- private int threadOccuranceTime;
- private boolean fClosed;
- private String username;
- private String password;
- private String host;
-
- // host selector
- private HostSelector fHostSelector = null;
-
- private final LinkedBlockingQueue<TimestampedMessage> fPending;
- private long fDontSendUntilMs;
- private final ScheduledThreadPoolExecutor fExec;
-
- private String latitude;
- private String longitude;
- private String version;
- private String serviceName;
- private String env;
- private String partner;
- private String routeOffer;
- private String subContextPath;
- private String protocol;
- private String methodType;
- private String url;
- private String dmeuser;
- private String dmepassword;
- private String contentType;
- private static final long sfWaitAfterError = 10000;
- private HashMap<String, String> DMETimeOuts;
- private DME2Client sender;
- public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
- private String authKey;
- private String authDate;
- private String handlers;
- private Properties props;
- public static String routerFilePath;
- protected static final Map<String, String> headers = new HashMap<String, String>();
- public static MultivaluedMap<String, Object> headersMap;
-
- private MRPublisherResponse pubResponse;
-
- public MRPublisherResponse getPubResponse() {
- return pubResponse;
- }
-
- public void setPubResponse(MRPublisherResponse pubResponse) {
- this.pubResponse = pubResponse;
- }
-
- public static String getRouterFilePath() {
- return routerFilePath;
- }
-
- public static void setRouterFilePath(String routerFilePath) {
- MRSimplerBatchPublisher.routerFilePath = routerFilePath;
- }
-
- public Properties getProps() {
- return props;
- }
-
- public void setProps(Properties props) {
- this.props = props;
- }
-
- public String getProtocolFlag() {
- return protocolFlag;
- }
-
- public void setProtocolFlag(String protocolFlag) {
- this.protocolFlag = protocolFlag;
- }
-
- private void DME2Configue() throws Exception {
- try {
-
- latitude = props.getProperty("Latitude");
- longitude = props.getProperty("Longitude");
- version = props.getProperty("Version");
- serviceName = props.getProperty("ServiceName");
- env = props.getProperty("Environment");
- partner = props.getProperty("Partner");
- routeOffer = props.getProperty("routeOffer");
- subContextPath = props.getProperty("SubContextPath") + fTopic;
-
- protocol = props.getProperty("Protocol");
- methodType = props.getProperty("MethodType");
- dmeuser = props.getProperty("username");
- dmepassword = props.getProperty("password");
- contentType = props.getProperty("contenttype");
- handlers = props.getProperty("sessionstickinessrequired");
- routerFilePath = props.getProperty("DME2preferredRouterFilePath");
-
- /**
- * Changes to DME2Client url to use Partner for auto failover
- * between data centers When Partner value is not provided use the
- * routeOffer value for auto failover within a cluster
- */
-
- String partitionKey = props.getProperty("partition");
-
- if (partner != null && !partner.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
- + partner;
- if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
- url = url + "&partitionKey=" + partitionKey;
- }
- } else if (routeOffer != null && !routeOffer.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
- + routeOffer;
- if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
- url = url + "&partitionKey=" + partitionKey;
- }
- }
-
- DMETimeOuts = new HashMap<String, String>();
- DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
- DMETimeOuts.put("Content-Type", contentType);
- System.setProperty("AFT_LATITUDE", latitude);
- System.setProperty("AFT_LONGITUDE", longitude);
- System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
- // System.setProperty("DME2.DEBUG", "true");
-
- // SSL changes
- // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
-
- System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
- System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
- System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-
- // SSL changes
-
- sender = new DME2Client(new URI(url), 5000L);
-
- sender.setAllowAllHttpReturnCodes(true);
- sender.setMethod(methodType);
- sender.setSubContext(subContextPath);
- sender.setCredentials(dmeuser, dmepassword);
- sender.setHeaders(DMETimeOuts);
- if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
- sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
- props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
- props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
- sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
- } else {
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
- }
- } catch (DME2Exception x) {
- getLog().warn(x.getMessage(), x);
- throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
- } catch (URISyntaxException x) {
-
- getLog().warn(x.getMessage(), x);
- throw new URISyntaxException(url, x.getMessage());
- } catch (Exception x) {
-
- getLog().warn(x.getMessage(), x);
- throw new IllegalArgumentException(x.getMessage());
- }
- }
-
- private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
- boolean compress) throws MalformedURLException {
- super(hosts);
-
- if (topic == null || topic.length() < 1) {
- throw new IllegalArgumentException("A topic must be provided.");
- }
-
- fHostSelector = new HostSelector(hosts, null);
- fClosed = false;
- fTopic = topic;
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- fCompress = compress;
-
- fPending = new LinkedBlockingQueue<TimestampedMessage>();
- fDontSendUntilMs = 0;
- fExec = new ScheduledThreadPoolExecutor(1);
- pubResponse = new MRPublisherResponse();
-
- }
-
- private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
- boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
- super(hosts);
-
- if (topic == null || topic.length() < 1) {
- throw new IllegalArgumentException("A topic must be provided.");
- }
-
- fHostSelector = new HostSelector(hosts, null);
- fClosed = false;
- fTopic = topic;
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- fCompress = compress;
- threadOccuranceTime = httpThreadOccurnace;
- fPending = new LinkedBlockingQueue<TimestampedMessage>();
- fDontSendUntilMs = 0;
- fExec = new ScheduledThreadPoolExecutor(1);
- fExec.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- send(false);
- }
- }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
- pubResponse = new MRPublisherResponse();
- }
-
- private static class TimestampedMessage extends message {
- public TimestampedMessage(message m) {
- super(m);
- timestamp = Clock.now();
- }
-
- public final long timestamp;
- }
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public String getContentType() {
- return contentType;
- }
-
- public void setContentType(String contentType) {
- this.contentType = contentType;
- }
-
- public String getAuthKey() {
- return authKey;
- }
-
- public void setAuthKey(String authKey) {
- this.authKey = authKey;
- }
-
- public String getAuthDate() {
- return authDate;
- }
-
- public void setAuthDate(String authDate) {
- this.authDate = authDate;
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/client/response/MRConsumerResponse.java b/src/main/java/com/att/nsa/mr/client/response/MRConsumerResponse.java
deleted file mode 100644
index 102794e..0000000
--- a/src/main/java/com/att/nsa/mr/client/response/MRConsumerResponse.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.response;
-
-public class MRConsumerResponse {
-
- private String responseCode;
-
- private String responseMessage;
-
- private Iterable<String> actualMessages;
-
-
-
-
- public String getResponseCode() {
- return responseCode;
- }
-
- public void setResponseCode(String responseCode) {
- this.responseCode = responseCode;
- }
-
- public String getResponseMessage() {
- return responseMessage;
- }
-
- public void setResponseMessage(String responseMessage) {
- this.responseMessage = responseMessage;
- }
-
- public Iterable<String> getActualMessages() {
- return actualMessages;
- }
-
- public void setActualMessages(Iterable<String> actualMessages) {
- this.actualMessages = actualMessages;
- }
-
-
-}
diff --git a/src/main/java/com/att/nsa/mr/client/response/MRPublisherResponse.java b/src/main/java/com/att/nsa/mr/client/response/MRPublisherResponse.java
deleted file mode 100644
index 96c39d6..0000000
--- a/src/main/java/com/att/nsa/mr/client/response/MRPublisherResponse.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.client.response;
-
-/**
- * Response for Publisher
- * @author author
- *
- */
-public class MRPublisherResponse {
- private String responseCode;
-
- private String responseMessage;
-
- private int pendingMsgs;
-
- public String getResponseCode() {
- return responseCode;
- }
-
- public void setResponseCode(String responseCode) {
- this.responseCode = responseCode;
- }
-
- public String getResponseMessage() {
- return responseMessage;
- }
-
- public void setResponseMessage(String responseMessage) {
- this.responseMessage = responseMessage;
- }
-
- public int getPendingMsgs() {
- return pendingMsgs;
- }
-
- public void setPendingMsgs(int pendingMsgs) {
- this.pendingMsgs = pendingMsgs;
- }
-
- @Override
- public String toString() {
- return "Response Code:" + this.responseCode + ","
- + "Response Message:" + this.responseMessage + "," + "Pending Messages Count"
- + this.pendingMsgs;
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java b/src/main/java/com/att/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java
deleted file mode 100644
index 4f04dec..0000000
--- a/src/main/java/com/att/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.dme.client;
-
-
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-
-//import com.att.aft.dme2.api.util.DME2Constants;
-//import com.att.aft.dme2.api.util.DME2ExchangeFaultContext;
-//import com.att.aft.dme2.api.util.LogMessage;
-//import com.att.aft.dme2.api.util.LogUtil;
-public class DefaultLoggingFailoverFaultHandler /*implements DME2FailoverFaultHandler*/ {
- //TODO: This code may be enable in the future when we implement DME2FailoverFaultHandler interface
- /** The logger. */
-
-
-// @Override
-
-// // LogUtil.INSTANCE.report(logger, Level.WARNING, LogMessage.SEP_FAILOVER, context.getService(),context.getRequestURL(),context.getRouteOffer(),context.getResponseCode(),context.getException());
-// }
-// @Override
-// /**
-// * The DME2Exchange already has a log message when the route offer is failed over. We dont need to log it again here.
-// */
-
-// //noop
-//
-
-}
-
diff --git a/src/main/java/com/att/nsa/mr/dme/client/HeaderReplyHandler.java b/src/main/java/com/att/nsa/mr/dme/client/HeaderReplyHandler.java
deleted file mode 100644
index 4363902..0000000
--- a/src/main/java/com/att/nsa/mr/dme/client/HeaderReplyHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.dme.client;
-
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.aft.dme2.api.util.DME2ExchangeFaultContext;
-import com.att.aft.dme2.api.util.DME2ExchangeReplyHandler;
-import com.att.aft.dme2.api.util.DME2ExchangeResponseContext;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
-
-
-
-
-
- public class HeaderReplyHandler implements DME2ExchangeReplyHandler {
-
- private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () );
-
-
- @Override public void handleFault(DME2ExchangeFaultContext responseData) {
- // TODO Auto-generated method stub
-
- }
- @Override public void handleEndpointFault(DME2ExchangeFaultContext responseData) {
- // TODO Auto-generated method stub
-
- }
-@Override public void handleReply(DME2ExchangeResponseContext responseData) {
-
- if(responseData != null) {
- MRClientFactory.DME2HeadersMap=responseData.getResponseHeaders();
- if (responseData.getResponseHeaders().get("transactionId")!=null)
- fLog.info("Transaction Id : " + responseData.getResponseHeaders().get("transactionId"));
-
- }
-}
-
-}
diff --git a/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteReplyHandler.java b/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteReplyHandler.java
deleted file mode 100644
index 2da5ce9..0000000
--- a/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteReplyHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.dme.client;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.InputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.aft.dme2.api.util.DME2ExchangeFaultContext;
-import com.att.aft.dme2.api.util.DME2ExchangeReplyHandler;
-import com.att.aft.dme2.api.util.DME2ExchangeResponseContext;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
-
-public class PreferredRouteReplyHandler implements DME2ExchangeReplyHandler {
- private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () );
- @Override public void handleReply(DME2ExchangeResponseContext responseData) {
-
- if(responseData != null) {
- MRClientFactory.DME2HeadersMap=responseData.getResponseHeaders();
- if (responseData.getResponseHeaders().get("transactionId")!=null)
-
- fLog.info("Transaction_Id : " + responseData.getResponseHeaders().get("transactionId"));
-
- if(responseData.getRouteOffer() != null ){
- routeWriter("preferredRouteKey",responseData.getRouteOffer());
-
- }
- }
-}
-
- @Override public void handleFault(DME2ExchangeFaultContext responseData) {
- // TODO Auto-generated method stub
- //StaticCache.getInstance().setHandleFaultInvoked(true);
- }
- @Override public void handleEndpointFault(DME2ExchangeFaultContext responseData) {
- // TODO Auto-generated method stub
-
- }
- public void routeWriter(String routeKey, String routeValue){
-
- try(FileWriter routeWriter=new FileWriter(new File (MRSimplerBatchPublisher.routerFilePath))){
- routeWriter.write(routeKey+"="+routeValue);
- routeWriter.close();
-
- }catch(Exception ex){
- fLog.error("Reply Router Error " + ex);
- }
-
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteRequestHandler.java b/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteRequestHandler.java
deleted file mode 100644
index fbc32a7..0000000
--- a/src/main/java/com/att/nsa/mr/dme/client/PreferredRouteRequestHandler.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.dme.client;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.aft.dme2.api.util.DME2ExchangeRequestContext;
-import com.att.aft.dme2.api.util.DME2ExchangeRequestHandler;
-import com.att.nsa.mr.client.MRClientFactory;
-
-public class PreferredRouteRequestHandler implements DME2ExchangeRequestHandler {
- private Logger logger = LoggerFactory.getLogger(this.getClass().getName());
-
- @Override
- public void handleRequest(DME2ExchangeRequestContext requestData) {
-
- if (requestData != null) {
-
- requestData.setPreferredRouteOffer(readRoute("preferredRouteKey"));
- }
- }
-
- public String readRoute(String routeKey) {
-
- try {
-
- MRClientFactory.prop.load(MRClientFactory.routeReader);
-
- } catch (Exception ex) {
- logger.error("Request Router Error " + ex);
- }
- return MRClientFactory.prop.getProperty(routeKey);
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java
deleted file mode 100644
index ce0138c..0000000
--- a/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-
-package com.att.nsa.mr.dme.client;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.MultivaluedMap;
-
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-
-import java.util.List;
-
-public class SimpleExampleConsumer {
-
- private static final Logger logger = LoggerFactory.getLogger(SimpleExampleConsumer.class);
-
- private SimpleExampleConsumer() {
- }
-
- public static void main(String[] args) {
-
- long count = 0;
- long nextReport = 5000;
- String key;
-
- final long startMs = System.currentTimeMillis();
-
- try {
-
- final MRConsumer cc = MRClientFactory.createConsumer("D:\\SG\\consumer.properties");
- while (true) {
- for (String msg : cc.fetch()) {
- logger.debug("Message Received: " + msg);
- }
- // Header for DME2 Call.
- MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap;
- for (MultivaluedMap.Entry<String, List<Object>> entry : headersMap.entrySet()) {
- key = entry.getKey();
- logger.debug("Header Key " + key);
- logger.debug("Header Value " + headersMap.get(key));
- }
- // Header for HTTP Call.
-
- Map<String, String> dme2headersMap = MRClientFactory.DME2HeadersMap;
- for (Map.Entry<String, String> entry : dme2headersMap.entrySet()) {
- key = entry.getKey();
- logger.debug("Header Key " + key);
- logger.debug("Header Value " + dme2headersMap.get(key));
- }
-
- if (count > nextReport) {
- nextReport += 5000;
-
- final long endMs = System.currentTimeMillis();
- final long elapsedMs = endMs - startMs;
- final double elapsedSec = elapsedMs / 1000.0;
- final double eps = count / elapsedSec;
- }
- }
- } catch (Exception x) {
- logger.error(x.toString());
- }
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/dme/client/SimpleExamplePublisher.java b/src/main/java/com/att/nsa/mr/dme/client/SimpleExamplePublisher.java
deleted file mode 100644
index f9e830c..0000000
--- a/src/main/java/com/att/nsa/mr/dme/client/SimpleExamplePublisher.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-
-package com.att.nsa.mr.dme.client;
-
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.core.MultivaluedMap;
-
-import org.json.JSONObject;
-
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRPublisher.message;
-
-/**
- * An example of how to use the Java publisher.
- *
- * @author author
- */
-public class SimpleExamplePublisher {
- static String content = null;
- static String messageSize = null;
- static String transport = null;
- static String messageCount = null;
-
- public void publishMessage(String producerFilePath) throws IOException, InterruptedException {
-
- // create our publisher
-
- // publish some messages
-
-
- StringBuilder sb = new StringBuilder();
- final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher(producerFilePath);
-
- if (content.equalsIgnoreCase("text/plain")) {
- for (int i = 0; i < Integer.parseInt(messageCount); i++) {
- for (int j = 0; j < Integer.parseInt(messageSize); j++) {
- sb.append("T");
- }
-
- pub.send(sb.toString());
- }
- } else if (content.equalsIgnoreCase("application/cambria")) {
- for (int i = 0; i < Integer.parseInt(messageCount); i++) {
- for (int j = 0; j < Integer.parseInt(messageSize); j++) {
- sb.append("C");
- }
-
- pub.send("Key", sb.toString());
- }
- } else if (content.equalsIgnoreCase("application/json")) {
- for (int i = 0; i < Integer.parseInt(messageCount); i++) {
-
- final JSONObject msg12 = new JSONObject();
- msg12.put("Name", "DMaaP Reference Client to Test jason Message");
-
- pub.send(msg12.toString());
-
- }
- }
-
- // ...
-
- // close the publisher to make sure everything's sent before exiting.
- // The batching
- // publisher interface allows the app to get the set of unsent messages.
- // It could
- // write them to disk, for example, to try to send them later.
- /* final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
- if (stuck.size() > 0) {
- System.err.println(stuck.size() + " messages unsent");
- } else {
- System.out.println("Clean exit; all messages sent.");
- }*/
-
- if (transport.equalsIgnoreCase("HTTP")) {
- MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap;
- for (String key : headersMap.keySet()) {
- System.out.println("Header Key " + key);
- System.out.println("Header Value " + headersMap.get(key));
- }
- } else {
- Map<String, String> dme2headersMap = MRClientFactory.DME2HeadersMap;
- for (String key : dme2headersMap.keySet()) {
- System.out.println("Header Key " + key);
- System.out.println("Header Value " + dme2headersMap.get(key));
- }
- }
-
- }
-
- public static void main(String[] args) throws InterruptedException, Exception {
-
- String producerFilePath = args[0];
- content = args[1];
- messageSize = args[2];
- transport = args[3];
- messageCount = args[4];
-
-
-
-
-
- SimpleExamplePublisher publisher = new SimpleExamplePublisher();
-
- publisher.publishMessage("D:\\SG\\producer.properties");
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/logging/MRAppender.java b/src/main/java/com/att/nsa/mr/logging/MRAppender.java
deleted file mode 100644
index 4bb3e71..0000000
--- a/src/main/java/com/att/nsa/mr/logging/MRAppender.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-/**
- *
- */
-package com.att.nsa.mr.logging;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.spi.LoggingEvent;
-
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRPublisher;
-
-/**
- * @author author
- *
- */
-public class MRAppender extends AppenderSkeleton {
-
- private Logger logger = LoggerFactory.getLogger(this.getClass().getName());
-
- private MRPublisher fPublisher;
-
- //Provided through log4j configuration
- private String topic;
- private String partition;
- private String hosts;
- private int maxBatchSize = 1;
- private int maxAgeMs = 1000;
- private boolean compress = false;
-
- /**
- *
- */
- public MRAppender() {
- super();
- }
-
- /**
- * @param isActive
- */
- public MRAppender(boolean isActive) {
- super(isActive);
- }
-
- /* (non-Javadoc)
- * @see org.apache.log4j.Appender#close()
- */
- @Override
- public void close() {
- if (!this.closed) {
- this.closed = true;
- fPublisher.close();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.log4j.Appender#requiresLayout()
- */
- @Override
- public boolean requiresLayout() {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.apache.log4j.AppenderSkeleton#append(org.apache.log4j.spi.LoggingEvent)
- */
- @Override
- protected void append(LoggingEvent event) {
- final String message;
-
- if (this.layout == null) {
- message = event.getRenderedMessage();
- } else {
- message = this.layout.format(event);
- }
-
- try {
- fPublisher.send(partition, message);
- } catch (IOException e) {
- logger.error("IOException: ", e);
- }
- }
-
- @Override
- public void activateOptions() {
- if (hosts != null && topic != null && partition != null) {
- fPublisher = MRClientFactory.createBatchingPublisher(hosts.split(","), topic, maxBatchSize, maxAgeMs, compress);
- } else {
- logger.error("The Hosts, Topic, and Partition parameter are required to create a MR Log4J Appender");
- }
- }
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getPartition() {
- return partition;
- }
-
- public void setPartition(String partition) {
- this.partition = partition;
- }
-
- public String getHosts() {
- return hosts;
- }
-
- public void setHosts(String hosts) {
- this.hosts = hosts;
- }
-
- public int getMaxBatchSize() {
- return maxBatchSize;
- }
-
- public void setMaxBatchSize(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
- }
-
- public int getMaxAgeMs() {
- return maxAgeMs;
- }
-
- public void setMaxAgeMs(int maxAgeMs) {
- this.maxAgeMs = maxAgeMs;
- }
-
- public boolean isCompress() {
- return compress;
- }
-
- public void setCompress(boolean compress) {
- this.compress = compress;
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java
deleted file mode 100644
index 60971c1..0000000
--- a/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-
-package com.att.nsa.mr.test.clients;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRPublisher.message;
-
-/**
- * A simple publisher that reads from std in, sending each line as a message.
- * @author author
- */
-public class ConsolePublisher
-{
-
- private static final Logger logger = LoggerFactory.getLogger(ConsolePublisher.class);
- private ConsolePublisher() {
- }
- public static void main ( String[] args ) throws IOException //throws IOException, InterruptedException
- {
- // read the hosts(s) from the command line
- final String hosts = args.length > 0 ? args[0] : "aaa.it.att.com,bbb.it.att.com,ccc.it.att.com";
-
- // read the topic name from the command line
- final String topic = args.length > 1 ? args[1] : "TEST-TOPIC";
-
- // read the topic name from the command line
- final String partition = args.length > 2 ? args[2] : UUID.randomUUID ().toString ();
-
- // set up some batch limits and the compression flag
- final int maxBatchSize = 100;
- final long maxAgeMs = 250;
- final boolean withGzip = false;
-
- // create our publisher
- final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, withGzip );
-
- final BufferedReader cin = new BufferedReader ( new InputStreamReader ( System.in ) );
- try
- {
- String line = null;
- while ( ( line = cin.readLine () ) != null )
- {
- pub.send ( partition, line );
- }
- }
- finally
- {
- List<message> leftovers = null;
- try
- {
- leftovers = pub.close ( 10, TimeUnit.SECONDS );
- }
- catch ( InterruptedException e )
- {
- logger.error( "Send on close interrupted." );
- Thread.currentThread().interrupt();
- }
- for ( message m : leftovers )
- {
- logger.error( "Unsent message: " + m.fMsg );
- }
- }
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java
deleted file mode 100644
index a4a176e..0000000
--- a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-/**
- *
- */
-package com.att.nsa.mr.test.clients;
-
-/**
- * @author author
- *
- */
-public enum ProtocolTypeConstants {
-
- DME2("DME2"), AAF_AUTH("HTTPAAF"), AUTH_KEY("HTTPAUTH"), HTTPNOAUTH("HTTPNOAUTH");
-
- private String value;
-
- private ProtocolTypeConstants(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java
deleted file mode 100644
index 44e5205..0000000
--- a/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.test.clients;
-
-import java.util.LinkedList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-
-public class SampleConsumer {
- private SampleConsumer() {
- }
- public static void main ( String[] args )
- {
- final Logger log = LoggerFactory.getLogger(SampleConsumer.class);
-
-
- log.info("Sample Consumer Class executing");
- final String topic = "com.att.app.dmaap.mr.testingTopic";
- final String url = ( args.length > 1 ? args[1] : "localhost:8181" );
- final String group = ( args.length > 2 ? args[2] :"grp" );
-
- final String id = ( args.length > 3 ? args[3] : "1" );
-
- long count = 0;
- long nextReport = 5000;
-
- final long startMs = System.currentTimeMillis ();
-
- final LinkedList<String> urlList = new LinkedList<> ();
- for ( String u : url.split ( "," ) )
- {
- urlList.add ( u );
- }
-
- final MRConsumer cc = MRClientFactory.createConsumer ( urlList, topic, group, id, 10*1000, 1000, null, "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" );
- try
- {
- while ( true )
- {
- for ( String msg : cc.fetch () )
- {
- log.info ( "" + (++count) + ": " + msg );
- }
-
- if ( count > nextReport )
- {
- nextReport += 5000;
-
- final long endMs = System.currentTimeMillis ();
- final long elapsedMs = endMs - startMs;
- final double elapsedSec = elapsedMs / 1000.0;
- final double eps = count / elapsedSec;
- log.info ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
- }
- log.info ( "" + (++count) + ": consumed message" );
- }
- }
- catch ( Exception x )
- {
- log.error( x.getClass().getName () + ": " + x.getMessage () );
- throw new IllegalArgumentException(x);
- }
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java
deleted file mode 100644
index 0233dcb..0000000
--- a/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.test.clients;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRClientBuilders.PublisherBuilder;
-import com.att.nsa.mr.client.MRPublisher.message;
-
-public class SamplePublisher {
- public static void main ( String[] args ) throws IOException, InterruptedException
- {
- final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class);
- // read the hosts(s) from the command line
- final String hosts = ( args.length > 0 ? args[0] : "localhost:8181" );
-
- // read the topic name from the command line
-
- final String topic = ( args.length > 1 ? args[1] : "com.att.app.dmaap.mr.testingTopic" );
-
- // set up some batch limits and the compression flag
- final int maxBatchSize = 100;
- final int maxAgeMs = 250;
- final boolean withGzip = false;
-
- // create our publisher
-
- final MRBatchingPublisher pub = new PublisherBuilder ().
- usingHosts ( hosts ).
- onTopic ( topic ).limitBatch(maxBatchSize, maxAgeMs).
- authenticatedBy ( "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ).
- build ()
- ;
- // publish some messages
- final JSONObject msg1 = new JSONObject ();
- msg1.put ( "name", "tttttttttttttttt" );
- msg1.put ( "greeting", "ooooooooooooooooo" );
- pub.send ( "MyPartitionKey", msg1.toString () );
-
- final JSONObject msg2 = new JSONObject ();
- msg2.put ( "now", System.currentTimeMillis () );
- pub.send ( "MyOtherPartitionKey", msg2.toString () );
-
- // ...
-
- // close the publisher to make sure everything's sent before exiting. The batching
- // publisher interface allows the app to get the set of unsent messages. It could
- // write them to disk, for example, to try to send them later.
- final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
- if ( stuck.isEmpty())
- {
- LOG.warn ( stuck.size() + " messages unsent" );
- }
- else
- {
- LOG.info ( "Clean exit; all messages sent." );
- }
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java
deleted file mode 100644
index 0e3ee5a..0000000
--- a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-
-package com.att.nsa.mr.test.clients;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-
-public class SimpleExampleConsumer {
-
- static FileWriter routeWriter = null;
- static Properties props = null;
- static FileReader routeReader = null;
-
- public static void main(String[] args) {
- final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumer.class);
-
- long count = 0;
- long nextReport = 5000;
-
- final long startMs = System.currentTimeMillis();
-
- try {
- String routeFilePath = "/src/main/resources/dme2/preferredRoute.txt";
-
- File fo = new File(routeFilePath);
- if (!fo.exists()) {
- routeWriter = new FileWriter(new File(routeFilePath));
- }
- routeReader = new FileReader(new File(routeFilePath));
- props = new Properties();
- final MRConsumer cc = MRClientFactory.createConsumer("/src/main/resources/dme2/consumer.properties");
- int i = 0;
- while (i < 10) {
- Thread.sleep(2);
- i++;
- for (String msg : cc.fetch()) {
- // System.out.println ( "" + (++count) + ": " + msg );
- System.out.println(msg);
- }
-
- if (count > nextReport) {
- nextReport += 5000;
-
- final long endMs = System.currentTimeMillis();
- final long elapsedMs = endMs - startMs;
- final double elapsedSec = elapsedMs / 1000.0;
- final double eps = count / elapsedSec;
- System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
- }
- }
- } catch (Exception x) {
- System.err.println(x.getClass().getName() + ": " + x.getMessage());
- LOG.error("exception: ", x);
- }
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
deleted file mode 100644
index 433ab9f..0000000
--- a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.test.clients;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-
-public class SimpleExampleConsumerWithReturnResponse {
-
- private static final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumerWithReturnResponse.class);
-
- static FileWriter routeWriter= null;
- static Properties props=null;
- static FileReader routeReader=null;
- public static void main ( String[] args )
- {
-
- long count = 0;
- long nextReport = 5000;
-
- final long startMs = System.currentTimeMillis ();
-
- try
- {
- String routeFilePath="src/main/resources/dme2/preferredRoute.txt";
-
-
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File (routeFilePath));
- }
- routeReader= new FileReader(new File (routeFilePath));
- props= new Properties();
- final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" );
- while ( true )
- {
- MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse();
- System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode());
-
- System.out.println("mrConsumerResponse Message :"+mrConsumerResponse.getResponseMessage());
-
- System.out.println("mrConsumerResponse ActualMessage :"+mrConsumerResponse.getActualMessages());
- /*for ( String msg : mrConsumerResponse.getActualMessages() )
- {
- //System.out.println ( "" + (++count) + ": " + msg );
- System.out.println(msg);
- }*/
- if ( count > nextReport )
- {
- nextReport += 5000;
-
- final long endMs = System.currentTimeMillis ();
- final long elapsedMs = endMs - startMs;
- final double elapsedSec = elapsedMs / 1000.0;
- final double eps = count / elapsedSec;
- System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
- }
- }
- }
- catch ( Exception x )
- {
- System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
- LOG.error("exception: ", x);
- }
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java
deleted file mode 100644
index 8a6c586..0000000
--- a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-
-package com.att.nsa.mr.test.clients;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.json.JSONObject;
-
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRPublisher.message;
-
-/**
- * An example of how to use the Java publisher.
- * @author author
- */
-public class SimpleExamplePublisher
-{
- static FileWriter routeWriter= null;
- static Properties props=null;
- static FileReader routeReader=null;
- public void publishMessage ( String producerFilePath ) throws IOException, InterruptedException, Exception
- {
-
- // create our publisher
- final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath);
- // publish some messages
- final JSONObject msg1 = new JSONObject ();
- msg1.put ( "Name", "Sprint" );
-
- pub.send ( "First cambria messge" );
- pub.send ( "MyPartitionKey", msg1.toString () );
-
- final JSONObject msg2 = new JSONObject ();
-
-
-
- // ...
-
- // close the publisher to make sure everything's sent before exiting. The batching
- // publisher interface allows the app to get the set of unsent messages. It could
- // write them to disk, for example, to try to send them later.
- final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
- if ( stuck.isEmpty() )
- {
- System.err.println ( stuck.size() + " messages unsent" );
- }
- else
- {
- System.out.println ( "Clean exit; all messages sent." );
- }
- }
-
- public static void main(String []args) throws InterruptedException, Exception{
-
- String routeFilePath="/src/main/resources/dme2/preferredRoute.txt";
-
- SimpleExamplePublisher publisher = new SimpleExamplePublisher();
-
-
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File (routeFilePath));
- }
- routeReader= new FileReader(new File (routeFilePath));
- props= new Properties();
- publisher.publishMessage("/src/main/resources/dme2/producer.properties");
- }
-
-}
-
diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java
deleted file mode 100644
index 9d179b2..0000000
--- a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.test.clients;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import org.json.JSONObject;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.response.MRPublisherResponse;
- /**
- *An example of how to use the Java publisher.
- * @author author
- *
- */
- public class SimpleExamplePublisherWithResponse
- {
- static FileWriter routeWriter= null;
- static Properties props=null;
- static FileReader routeReader=null;
-
- public static void main(String []args) throws InterruptedException, Exception{
-
- String routeFilePath="src/main/resources/dme2/preferredRoute.txt";
- String msgCount = args[0];
- SimpleExamplePublisherWithResponse publisher = new SimpleExamplePublisherWithResponse();
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File (routeFilePath));
- }
- routeReader= new FileReader(new File (routeFilePath));
- props= new Properties();
- int i=0;
- while (i< Integer.valueOf(msgCount))
- {
- publisher.publishMessage("src/main/resources/dme2/producer.properties",Integer.valueOf(msgCount));
- i++;
- }
- }
-
- public void publishMessage ( String producerFilePath , int count ) throws IOException, InterruptedException
- {
- // create our publisher
- final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath,true);
- // publish some messages
- final JSONObject msg1 = new JSONObject ();
-
- msg1.put ( "Partition:1", "Message:"+count);
- msg1.put ( "greeting", "Hello .." );
-
-
- pub.send ( "1", msg1.toString());
- pub.send ( "1", msg1.toString());
-
- MRPublisherResponse res= pub.sendBatchWithResponse();
-
- System.out.println("Pub response->"+res.toString());
- }
-
-
- }
diff --git a/src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java b/src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java
deleted file mode 100644
index 56bec36..0000000
--- a/src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.test.support;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.response.MRPublisherResponse;
-
-/**
- * A helper for unit testing systems that use a MRPublisher. When setting
- * up your test, inject an instance into MRClientFactory to have it return
- * the mock client.
- *
- * @author author
- *
- */
-public class MRBatchingPublisherMock implements MRBatchingPublisher
-{
- public class Entry
- {
- public Entry ( String partition, String msg )
- {
- fPartition = partition;
- fMessage = msg;
- }
-
- @Override
- public String toString ()
- {
- return fMessage;
- }
-
- public final String fPartition;
- public final String fMessage;
- }
-
- public MRBatchingPublisherMock ()
- {
- fCaptures = new LinkedList<> ();
- }
-
- public interface Listener
- {
- void onMessage ( Entry e );
- }
- public void addListener ( Listener listener )
- {
- fListeners.add ( listener );
- }
-
- public List<Entry> getCaptures ()
- {
- return getCaptures ( new MessageFilter () { @Override public boolean match ( String msg ) { return true; } } );
- }
-
- public interface MessageFilter
- {
- boolean match ( String msg );
- }
-
- public List<Entry> getCaptures ( MessageFilter filter )
- {
- final LinkedList<Entry> result = new LinkedList<> ();
- for ( Entry capture : fCaptures )
- {
- if ( filter.match ( capture.fMessage ) )
- {
- result.add ( capture );
- }
- }
- return result;
- }
-
- public int received ()
- {
- return fCaptures.size();
- }
-
- public void reset ()
- {
- fCaptures.clear ();
- }
-
- @Override
- public int send ( String partition, String msg )
- {
- final Entry e = new Entry ( partition, msg );
-
- fCaptures.add ( e );
- for ( Listener l : fListeners )
- {
- l.onMessage ( e );
- }
- return 1;
- }
-
- @Override
- public int send ( message msg )
- {
- return send ( msg.fPartition, msg.fMsg );
- }
- @Override
- public int send ( String msg )
- {
- return 1;
-
- }
-
- @Override
- public int send ( Collection<message> msgs )
- {
- int sum = 0;
- for ( message m : msgs )
- {
- sum += send ( m );
- }
- return sum;
- }
-
- @Override
- public int getPendingMessageCount ()
- {
- return 0;
- }
-
- @Override
- public List<message> close ( long timeout, TimeUnit timeoutUnits )
- {
- return new LinkedList<> ();
- }
-
- @Override
- public void close ()
- {
- }
-
- @Override
- public void setApiCredentials ( String apiKey, String apiSecret )
- {
- }
-
- @Override
- public void clearApiCredentials ()
- {
- }
-
- @Override
- public void logTo ( Logger log )
- {
- }
-
- private final LinkedList<Entry> fCaptures;
- private LinkedList<Listener> fListeners = new LinkedList<> ();
- @Override
- public MRPublisherResponse sendBatchWithResponse() {
- // TODO Auto-generated method stub
- return null;
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java b/src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java
deleted file mode 100644
index c731030..0000000
--- a/src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.test.support;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.slf4j.Logger;
-
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-
-/**
- * A helper for unit testing systems that use a MRConsumer. When setting
- * up your test, inject an instance into MRClientFactory to have it return
- * the mock client.
- *
- * @author author
- *
- */
-public class MRConsumerMock implements MRConsumer
-{
- public class Entry
- {
- public Entry ( long waitMs, int statusCode, List<String> msgs )
- {
- fWaitMs = waitMs;
- fStatusCode = statusCode;
- fStatusMsg = null;
- fMsgs = new LinkedList<> ( msgs );
- }
-
- public Entry ( long waitMs, int statusCode, String statusMsg )
- {
- fWaitMs = waitMs;
- fStatusCode = statusCode;
- fStatusMsg = statusMsg;
- fMsgs = null;
- }
-
- public LinkedList<String> run () throws IOException
- {
- try
- {
- Thread.sleep ( fWaitMs );
- if ( fStatusCode >= 200 && fStatusCode <= 299 )
- {
- return fMsgs;
- }
- throw new IOException ( "" + fStatusCode + " " + fStatusMsg );
- }
- catch ( InterruptedException e )
- {
- Thread.currentThread().interrupt();
- throw new IOException ( e );
- }
- }
-
- private final long fWaitMs;
- private final int fStatusCode;
- private final String fStatusMsg;
- private final LinkedList<String> fMsgs;
- }
-
- public MRConsumerMock ()
- {
- fReplies = new LinkedList<> ();
- }
-
- @Override
- public void close ()
- {
- }
-
- @Override
- public void setApiCredentials ( String apiKey, String apiSecret )
- {
- }
-
- @Override
- public void clearApiCredentials ()
- {
- }
-
- public synchronized void add ( Entry e )
- {
- fReplies.add ( e );
- }
-
- public void addImmediateMsg ( String msg )
- {
- addDelayedMsg ( 0, msg );
- }
-
- public void addDelayedMsg ( long delay, String msg )
- {
- final LinkedList<String> list = new LinkedList<> ();
- list.add ( msg );
- add ( new Entry ( delay, 200, list ) );
- }
-
- public void addImmediateMsgGroup ( List<String> msgs )
- {
- addDelayedMsgGroup ( 0, msgs );
- }
-
- public void addDelayedMsgGroup ( long delay, List<String> msgs )
- {
- final LinkedList<String> list = new LinkedList<> ( msgs );
- add ( new Entry ( delay, 200, list ) );
- }
-
- public void addImmediateError ( int statusCode, String statusText )
- {
- add ( new Entry ( 0, statusCode, statusText ) );
- }
-
- @Override
- public Iterable<String> fetch () throws IOException
- {
- return fetch ( -1, -1 );
- }
-
- @Override
- public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException
- {
- return fReplies.size () > 0 ? fReplies.removeFirst ().run() : new LinkedList<String>();
- }
-
- @Override
- public void logTo ( Logger log )
- {
- }
-
- private final LinkedList<Entry> fReplies;
-
- @Override
- public MRConsumerResponse fetchWithReturnConsumerResponse() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs,
- int limit) {
- // TODO Auto-generated method stub
- return null;
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/ApiKeyCommand.java b/src/main/java/com/att/nsa/mr/tools/ApiKeyCommand.java
deleted file mode 100644
index df28fc0..0000000
--- a/src/main/java/com/att/nsa/mr/tools/ApiKeyCommand.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.tools;
-
-import java.io.IOException;
-import java.io.PrintStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiClient.credentials.ApiCredential;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
-import com.att.nsa.cmdtool.Command;
-import com.att.nsa.cmdtool.CommandNotReadyException;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRIdentityManager;
-import com.att.nsa.mr.client.MRClient.MRApiException;
-import com.att.nsa.mr.client.MRIdentityManager.ApiKey;
-
-public class ApiKeyCommand implements Command<MRCommandContext>
-{
- final Logger log = LoggerFactory.getLogger(ApiKeyCommand.class);
- @Override
- public String[] getMatches ()
- {
- return new String[]{
- "key (create|update) (\\S*) (\\S*)",
- "key (list) (\\S*)",
- "key (revoke)",
- };
- }
-
- @Override
- public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
- {
- if ( !context.checkClusterReady () )
- {
- throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." );
- }
- }
-
- @Override
- public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
- {
- final MRIdentityManager tm = MRClientFactory.createIdentityManager ( context.getCluster(), context.getApiKey(), context.getApiPwd() );
- context.applyTracer ( tm );
-
- try
- {
- if ( parts[0].equals ( "list" ) )
- {
- final ApiKey key = tm.getApiKey ( parts[1] );
- if ( key != null )
- {
- out.println ( "email: " + key.getEmail () );
- out.println ( "description: " + key.getDescription () );
- }
- else
- {
- out.println ( "No key returned" );
- }
- }
- else if ( parts[0].equals ( "create" ) )
- {
- final ApiCredential ac = tm.createApiKey ( parts[1], parts[2] );
- if ( ac != null )
- {
- out.println ( " key: " + ac.getApiKey () );
- out.println ( "secret: " + ac.getApiSecret () );
- }
- else
- {
- out.println ( "No credential returned?" );
- }
- }
- else if ( parts[0].equals ( "update" ) )
- {
- tm.updateCurrentApiKey ( parts[1], parts[2] );
- out.println ( "Updated" );
- }
- else if ( parts[0].equals ( "revoke" ) )
- {
- tm.deleteCurrentApiKey ();
- out.println ( "Updated" );
- }
- }
- catch ( HttpObjectNotFoundException e )
- {
- out.println ( "Object not found: " + e.getMessage () );
- log.error("HttpObjectNotFoundException: ", e);
- }
- catch ( HttpException e )
- {
- out.println ( "HTTP exception: " + e.getMessage () );
- log.error("HttpException: ", e);
- }
- catch ( MRApiException e )
- {
- out.println ( "API exception: " + e.getMessage () );
- log.error("MRApiException: ", e);
- }
- catch ( IOException e )
- {
- out.println ( "IO exception: " + e.getMessage () );
- log.error("IOException: ", e);
- }
- finally
- {
- tm.close ();
- }
- }
-
- @Override
- public void displayHelp ( PrintStream out )
- {
- out.println ( "key create <email> <description>" );
- out.println ( "key update <email> <description>" );
- out.println ( "key list <key>" );
- out.println ( "key revoke" );
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/AuthCommand.java b/src/main/java/com/att/nsa/mr/tools/AuthCommand.java
deleted file mode 100644
index 72f226b..0000000
--- a/src/main/java/com/att/nsa/mr/tools/AuthCommand.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.tools;
-
-import java.io.PrintStream;
-
-import com.att.nsa.cmdtool.Command;
-import com.att.nsa.cmdtool.CommandNotReadyException;
-
-public class AuthCommand implements Command<MRCommandContext>
-{
- @Override
- public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
- {
- }
-
- @Override
- public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
- {
- if ( parts.length > 0 )
- {
- context.setAuth ( parts[0], parts[1] );
- out.println ( "Now authenticating with " + parts[0] );
- }
- else
- {
- context.clearAuth ();
- out.println ( "No longer authenticating." );
- }
- }
-
- @Override
- public void displayHelp ( PrintStream out )
- {
- out.println ( "auth <apiKey> <apiSecret>" );
- out.println ( "\tuse these credentials on subsequent transactions" );
- out.println ( "noauth" );
- out.println ( "\tdo not use credentials on subsequent transactions" );
- }
-
- @Override
- public String[] getMatches ()
- {
- return new String[]
- {
- "auth (\\S*) (\\S*)",
- "noauth"
- };
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/ClusterCommand.java b/src/main/java/com/att/nsa/mr/tools/ClusterCommand.java
deleted file mode 100644
index 5ecaf7c..0000000
--- a/src/main/java/com/att/nsa/mr/tools/ClusterCommand.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.tools;
-
-import java.io.PrintStream;
-
-import com.att.nsa.cmdtool.Command;
-import com.att.nsa.cmdtool.CommandNotReadyException;
-import com.att.nsa.mr.client.impl.MRConsumerImpl;
-
-public class ClusterCommand implements Command<MRCommandContext>
-{
-
- @Override
- public String[] getMatches ()
- {
- return new String[]{
- "cluster",
- "cluster (\\S*)?",
- };
- }
-
- @Override
- public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
- {
- }
-
- @Override
- public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
- {
- if ( parts.length == 0 )
- {
- for ( String host : context.getCluster () )
- {
- out.println ( host );
- }
- }
- else
- {
- context.clearCluster ();
- for ( String part : parts )
- {
- String[] hosts = part.trim().split ( "\\s+" );
- for ( String host : hosts )
- {
- for ( String splitHost : MRConsumerImpl.stringToList(host) )
- {
- context.addClusterHost ( splitHost );
- }
- }
- }
- }
- }
-
- @Override
- public void displayHelp ( PrintStream out )
- {
- out.println ( "cluster host1 host2 ..." );
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/MRCommandContext.java b/src/main/java/com/att/nsa/mr/tools/MRCommandContext.java
deleted file mode 100644
index e512769..0000000
--- a/src/main/java/com/att/nsa/mr/tools/MRCommandContext.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.tools;
-
-import java.util.Collection;
-import java.util.LinkedList;
-
-import com.att.nsa.apiClient.http.HttpClient;
-import com.att.nsa.apiClient.http.HttpTracer;
-import com.att.nsa.cmdtool.CommandContext;
-import com.att.nsa.mr.client.MRClient;
-
-public class MRCommandContext implements CommandContext
-{
- public MRCommandContext ()
- {
- fApiKey = null;
- fApiPwd = null;
-
- fCluster = new LinkedList<> ();
- fCluster.add ( "localhost" );
- }
-
- @Override
- public void requestShutdown ()
- {
- fShutdown = true;
- }
-
- @Override
- public boolean shouldContinue ()
- {
- return !fShutdown;
- }
-
- public void setAuth ( String key, String pwd ) { fApiKey = key; fApiPwd = pwd; }
- public void clearAuth () { setAuth(null,null); }
-
- public boolean checkClusterReady ()
- {
- return ( fCluster.isEmpty());
- }
-
- public Collection<String> getCluster ()
- {
- return new LinkedList<> ( fCluster );
- }
-
- public void clearCluster ()
- {
- fCluster.clear ();
- }
-
- public void addClusterHost ( String host )
- {
- fCluster.add ( host );
- }
-
- public String getApiKey () { return fApiKey; }
- public String getApiPwd () { return fApiPwd; }
-
- public void useTracer ( HttpTracer t )
- {
- fTracer = t;
- }
- public void noTracer () { fTracer = null; }
-
- public void applyTracer ( MRClient cc )
- {
- if ( cc instanceof HttpClient && fTracer != null )
- {
- ((HttpClient)cc).installTracer ( fTracer );
- }
- }
-
- private boolean fShutdown;
- private String fApiKey;
- private String fApiPwd;
- private final LinkedList<String> fCluster;
- private HttpTracer fTracer = null;
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/MRTool.java b/src/main/java/com/att/nsa/mr/tools/MRTool.java
deleted file mode 100644
index 7f1effd..0000000
--- a/src/main/java/com/att/nsa/mr/tools/MRTool.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.tools;
-
-import java.io.IOException;
-
-import com.att.nsa.cmdtool.CommandLineTool;
-import com.att.nsa.mr.client.impl.MRClientVersionInfo;
-
-public class MRTool extends CommandLineTool<MRCommandContext>
-{
- protected MRTool ()
- {
- super ( "MR Tool (" + MRClientVersionInfo.getVersion () + ")", "MR> " );
-
- registerCommand ( new ApiKeyCommand () );
- registerCommand ( new AuthCommand () );
- registerCommand ( new ClusterCommand () );
- registerCommand ( new MessageCommand () );
- registerCommand ( new TopicCommand () );
- registerCommand ( new TraceCommand () );
- }
-
- public static void main ( String[] args ) throws IOException
- {
- final MRTool ct = new MRTool ();
- final MRCommandContext ccc = new MRCommandContext ();
- ct.runFromMain ( args, ccc );
- }
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/MessageCommand.java b/src/main/java/com/att/nsa/mr/tools/MessageCommand.java
deleted file mode 100644
index 54e92ae..0000000
--- a/src/main/java/com/att/nsa/mr/tools/MessageCommand.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.tools;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.cmdtool.Command;
-import com.att.nsa.cmdtool.CommandNotReadyException;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.MRClientBuilders.PublisherBuilder;
-import com.att.nsa.mr.client.MRPublisher.message;
-
-public class MessageCommand implements Command<MRCommandContext>
-{
- final Logger logger = LoggerFactory.getLogger(ApiKeyCommand.class);
- @Override
- public String[] getMatches ()
- {
- return new String[]{
- "(post) (\\S*) (\\S*) (.*)",
- "(read) (\\S*) (\\S*) (\\S*)",
- };
- }
-
- @Override
- public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
- {
- if ( !context.checkClusterReady () )
- {
- throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." );
- }
- }
-
- @Override
- public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
- {
- if ( parts[0].equalsIgnoreCase ( "read" ))
- {
- final MRConsumer cc = MRClientFactory.createConsumer ( context.getCluster (), parts[1], parts[2], parts[3],
- -1, -1, null, context.getApiKey(), context.getApiPwd() );
- context.applyTracer ( cc );
- try
- {
- for ( String msg : cc.fetch () )
- {
- out.println ( msg );
- }
- }
- catch ( Exception e )
- {
- out.println ( "Problem fetching messages: " + e.getMessage() );
- logger.error("Problem fetching messages: ", e);
- }
- finally
- {
- cc.close ();
- }
- }
- else
- {
- final MRBatchingPublisher pub=ToolsUtil.createBatchPublisher(context, parts[1]);
- try
- {
- pub.send ( parts[2], parts[3] );
- }
- catch ( IOException e )
- {
- out.println ( "Problem sending message: " + e.getMessage() );
- logger.error("Problem sending message: ", e);
- }
- finally
- {
- List<message> left = null;
- try
- {
- left = pub.close ( 500, TimeUnit.MILLISECONDS );
- }
- catch ( IOException e )
- {
- out.println ( "Problem sending message: " + e.getMessage() );
- logger.error("Problem sending message: ", e);
- }
- catch ( InterruptedException e )
- {
- out.println ( "Problem sending message: " + e.getMessage() );
- logger.error("Problem sending message: ", e);
- Thread.currentThread().interrupt();
- }
- if ( left != null && left.isEmpty() )
- {
- out.println ( left.size() + " messages not sent." );
- }
- }
- }
- }
-
- @Override
- public void displayHelp ( PrintStream out )
- {
- out.println ( "post <topicName> <partition> <message>" );
- out.println ( "read <topicName> <consumerGroup> <consumerId>" );
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/ToolsUtil.java b/src/main/java/com/att/nsa/mr/tools/ToolsUtil.java
deleted file mode 100644
index 45b21d1..0000000
--- a/src/main/java/com/att/nsa/mr/tools/ToolsUtil.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package com.att.nsa.mr.tools;
-
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientBuilders.PublisherBuilder;
-
-public final class ToolsUtil {
-
- private ToolsUtil() {
- }
-
- public static MRBatchingPublisher createBatchPublisher(MRCommandContext context,String topicName){
-
- return new PublisherBuilder ().
- usingHosts ( context.getCluster () ).
- onTopic (topicName).
- authenticatedBy ( context.getApiKey(), context.getApiPwd() ).
- build ();
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/TopicCommand.java b/src/main/java/com/att/nsa/mr/tools/TopicCommand.java
deleted file mode 100644
index 4b1151e..0000000
--- a/src/main/java/com/att/nsa/mr/tools/TopicCommand.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.tools;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
-import com.att.nsa.cmdtool.Command;
-import com.att.nsa.cmdtool.CommandNotReadyException;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRTopicManager;
-import com.att.nsa.mr.client.MRTopicManager.TopicInfo;
-
-public class TopicCommand implements Command<MRCommandContext>
-{
- final Logger logger = LoggerFactory.getLogger(ApiKeyCommand.class);
- @Override
- public String[] getMatches ()
- {
- return new String[]{
- "topic (list)",
- "topic (list) (\\S*)",
- "topic (create) (\\S*) (\\S*) (\\S*)",
- "topic (grant|revoke) (read|write) (\\S*) (\\S*)",
- };
- }
-
- @Override
- public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
- {
- if ( !context.checkClusterReady () )
- {
- throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." );
- }
- }
-
- @Override
- public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
- {
- final MRTopicManager tm = MRClientFactory.createTopicManager ( context.getCluster(), context.getApiKey(), context.getApiPwd() );
- context.applyTracer ( tm );
-
- try
- {
- if ( parts[0].equals ( "list" ) )
- {
- try
- {
- if ( parts.length == 1 )
- {
- for ( String topic : tm.getTopics () )
- {
- out.println ( topic );
- }
- }
- else
- {
- final TopicInfo ti = tm.getTopicMetadata ( parts[1] );
-
- final String owner = ti.getOwner ();
- out.println ( " owner: " + ( owner == null ? "<none>" : owner ) );
-
- final String desc = ti.getDescription ();
- out.println ( "description: " + ( desc == null ? "<none>" : desc ) );
-
- final Set<String> prods = ti.getAllowedProducers ();
- if ( prods != null )
- {
- out.println ( " write ACL: " );
- for ( String key : prods )
- {
- out.println ( "\t" + key );
- }
- }
- else
- {
- out.println ( " write ACL: <not active>" );
- }
-
- final Set<String> cons = ti.getAllowedConsumers ();
- if ( cons != null )
- {
- out.println ( " read ACL: " );
- for ( String key : cons )
- {
- out.println ( "\t" + key );
- }
- }
- else
- {
- out.println ( " read ACL: <not active>" );
- }
- }
- }
- catch ( IOException x )
- {
- out.println ( "Problem with request: " + x.getMessage () );
- logger.error("IOException: ", x);
- }
- catch ( HttpObjectNotFoundException e )
- {
- out.println ( "Not found: " + e.getMessage () );
- logger.error("HttpObjectNotFoundException: ", e);
- }
- }
- else if ( parts[0].equals ( "create" ) )
- {
- try
- {
- final int partitions = Integer.parseInt ( parts[2] );
- final int replicas = Integer.parseInt ( parts[3] );
-
- tm.createTopic ( parts[1], "", partitions, replicas );
- }
- catch ( HttpException e )
- {
- out.println ( "Problem with request: " + e.getMessage () );
- logger.error("HttpException: ", e);
- }
- catch ( IOException e )
- {
- out.println ( "Problem with request: " + e.getMessage () );
- logger.error("IOException: ", e);
- }
- catch ( NumberFormatException e )
- {
- out.println ( "Problem with request: " + e.getMessage () );
- logger.error("NumberFormatException: ", e);
- }
- }
- else if ( parts[0].equals ( "grant" ) )
- {
- try
- {
- if ( parts[1].equals ( "write" ) )
- {
- tm.allowProducer ( parts[2], parts[3] );
- }
- else if ( parts[1].equals ( "read" ) )
- {
- tm.allowConsumer ( parts[2], parts[3] );
- }
- }
- catch ( HttpException e )
- {
- out.println ( "Problem with request: " + e.getMessage () );
- logger.error("HttpException: ", e);
- }
- catch ( IOException e )
- {
- out.println ( "Problem with request: " + e.getMessage () );
- logger.error("IOException: ", e);
- }
- }
- else if ( parts[0].equals ( "revoke" ) )
- {
- try
- {
- if ( parts[1].equals ( "write" ) )
- {
- tm.revokeProducer ( parts[2], parts[3] );
- }
- else if ( parts[1].equals ( "read" ) )
- {
- tm.revokeConsumer ( parts[2], parts[3] );
- }
- }
- catch ( HttpException e )
- {
- out.println ( "Problem with request: " + e.getMessage () );
- logger.error("HttpException: ", e);
- }
- catch ( IOException e )
- {
- out.println ( "Problem with request: " + e.getMessage () );
- logger.error("IOException: ", e);
- }
- }
- }
- finally
- {
- tm.close ();
- }
- }
-
- @Override
- public void displayHelp ( PrintStream out )
- {
- out.println ( "topic list" );
- out.println ( "topic list <topicName>" );
- out.println ( "topic create <topicName> <partitions> <replicas>" );
- out.println ( "topic grant write|read <topicName> <apiKey>" );
- out.println ( "topic revoke write|read <topicName> <apiKey>" );
- }
-
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/TraceCommand.java b/src/main/java/com/att/nsa/mr/tools/TraceCommand.java
deleted file mode 100644
index 0489172..0000000
--- a/src/main/java/com/att/nsa/mr/tools/TraceCommand.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.tools;
-
-import java.io.PrintStream;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-
-import com.att.nsa.apiClient.http.HttpTracer;
-import com.att.nsa.cmdtool.Command;
-import com.att.nsa.cmdtool.CommandNotReadyException;
-
-public class TraceCommand implements Command<MRCommandContext>
-{
- @Override
- public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
- {
- }
-
- @Override
- public void execute ( String[] parts, MRCommandContext context, final PrintStream out ) throws CommandNotReadyException
- {
- if ( parts[0].equalsIgnoreCase ( "on" ))
- {
- context.useTracer ( new HttpTracer ()
- {
- @Override
- public void outbound ( URI uri, Map<String, List<String>> headers, String method, byte[] entity )
- {
- out.println ( kLineBreak );
- out.println ( ">>> " + method + " " + uri.toString() );
- for ( Map.Entry<String,List<String>> e : headers.entrySet () )
- {
- final StringBuffer vals = new StringBuffer ();
- for ( String val : e.getValue () )
- {
- if ( vals.length () > 0 ) vals.append ( ", " );
- vals.append ( val );
- }
- out.println ( ">>> " + e.getKey () + ": " + vals.toString() );
- }
- if ( entity != null )
- {
- out.println ();
- out.println ( new String ( entity ) );
- }
- out.println ( kLineBreak );
- }
-
- @Override
- public void inbound ( Map<String, List<String>> headers, int statusCode, String responseLine, byte[] entity )
- {
- out.println ( kLineBreak );
- out.println ( "<<< " + responseLine );
- for ( Map.Entry<String,List<String>> e : headers.entrySet () )
- {
- final StringBuffer vals = new StringBuffer ();
- for ( String val : e.getValue () )
- {
- if ( vals.length () > 0 ) vals.append ( ", " );
- vals.append ( val );
- }
- out.println ( "<<< " + e.getKey () + ": " + vals.toString() );
- }
- if ( entity != null )
- {
- out.println ();
- out.println ( new String ( entity ) );
- }
- out.println ( kLineBreak );
- }
- } );
- }
- else
- {
- context.noTracer ();
- }
- }
-
- @Override
- public void displayHelp ( PrintStream out )
- {
- out.println ( "trace on|off" );
- out.println ( "\tWhen trace is on, HTTP interaction is printed to the console." );
- }
-
- @Override
- public String[] getMatches ()
- {
- return new String[]
- {
- "trace (on)",
- "trace (off)"
- };
- }
-
- private static final String kLineBreak = "======================================================================";
-}
diff --git a/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java
deleted file mode 100644
index 0539582..0000000
--- a/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.mr.tools;
-
-import java.util.Properties;
-
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
-public class ValidatorUtil {
-
- public static void validatePublisher(Properties props) {
- String transportType = props.getProperty("TransportType");
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(transportType)) {
- validateForDME2(props);
- } else {
- validateForNonDME2(props);
- }
- String maxBatchSize = props.getProperty("maxBatchSize");
- if (maxBatchSize == null || maxBatchSize.isEmpty()) {
- throw new IllegalArgumentException ( "maxBatchSize is needed" );
- }
- String maxAgeMs = props.getProperty("maxAgeMs");
- if (maxAgeMs == null || maxAgeMs.isEmpty()) {
- throw new IllegalArgumentException ( "maxAgeMs is needed" );
- }
- String messageSentThreadOccurance = props.getProperty("MessageSentThreadOccurance");
- if (messageSentThreadOccurance == null || messageSentThreadOccurance.isEmpty()) {
- throw new IllegalArgumentException ( "MessageSentThreadOccurance is needed" );
- }
-
- }
-
- public static void validateSubscriber(Properties props) {
- String transportType = props.getProperty("TransportType");
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(transportType)) {
- validateForDME2(props);
- } else {
- validateForNonDME2(props);
- }
- String group = props.getProperty("group");
- if (group == null || group.isEmpty()) {
- throw new IllegalArgumentException ( "group is needed" );
- }
- String id = props.getProperty("id");
- if (id == null || id.isEmpty()) {
- throw new IllegalArgumentException ( "Consumer (Id) is needed" );
- }
- }
-
- private static void validateForDME2(Properties props) {
- String serviceName = props.getProperty("ServiceName");
- if (serviceName == null || serviceName.isEmpty()) {
- throw new IllegalArgumentException ( "Servicename is needed" );
- }
- String topic = props.getProperty("topic");
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException ( "topic is needed" );
- }
- String username = props.getProperty("username");
- if (username == null || username.isEmpty()) {
- throw new IllegalArgumentException ( "username is needed" );
- }
- String password = props.getProperty("password");
- if (password == null || password.isEmpty()) {
- throw new IllegalArgumentException ( "password is needed" );
- }
- String dME2preferredRouterFilePath = props.getProperty("DME2preferredRouterFilePath");
- if (dME2preferredRouterFilePath == null || dME2preferredRouterFilePath.isEmpty()) {
- throw new IllegalArgumentException ( "DME2preferredRouterFilePath is needed" );
- }
- String partner = props.getProperty("Partner");
- String routeOffer = props.getProperty("routeOffer");
- if ((partner == null || partner.isEmpty()) && (routeOffer == null || routeOffer.isEmpty())) {
- throw new IllegalArgumentException ( "Partner or routeOffer is needed" );
- }
- String protocol = props.getProperty("Protocol");
- if (protocol == null || protocol.isEmpty()) {
- throw new IllegalArgumentException ( "Protocol is needed" );
- }
- String methodType = props.getProperty("MethodType");
- if (methodType == null || methodType.isEmpty()) {
- throw new IllegalArgumentException ( "MethodType is needed" );
- }
- String contenttype = props.getProperty("contenttype");
- if (contenttype == null || contenttype.isEmpty()) {
- throw new IllegalArgumentException ( "contenttype is needed" );
- }
- String latitude = props.getProperty("Latitude");
- if (latitude == null || latitude.isEmpty()) {
- throw new IllegalArgumentException ( "Latitude is needed" );
- }
- String longitude = props.getProperty("Longitude");
- if (longitude == null || longitude.isEmpty()) {
- throw new IllegalArgumentException ( "Longitude is needed" );
- }
- String aftEnv = props.getProperty("AFT_ENVIRONMENT");
- if (aftEnv == null || aftEnv.isEmpty()) {
- throw new IllegalArgumentException ( "AFT_ENVIRONMENT is needed" );
- }
- String version = props.getProperty("Version");
- if (version == null || version.isEmpty()) {
- throw new IllegalArgumentException ( "Version is needed" );
- }
- String environment = props.getProperty("Environment");
- if (environment == null || environment.isEmpty()) {
- throw new IllegalArgumentException ( "Environment is needed" );
- }
- String subContextPath = props.getProperty("SubContextPath");
- if (subContextPath == null || subContextPath.isEmpty()) {
- throw new IllegalArgumentException ( "SubContextPath is needed" );
- }
- String sessionstickinessrequired = props.getProperty("sessionstickinessrequired");
- if (sessionstickinessrequired == null || sessionstickinessrequired.isEmpty()) {
- throw new IllegalArgumentException ( "sessionstickinessrequired is needed" );
- }
- }
-
- private static void validateForNonDME2(Properties props) {
- String transportType = props.getProperty("TransportType");
- String host = props.getProperty("host");
- if (host == null || host.isEmpty()) {
- throw new IllegalArgumentException ( "Servicename is needed" );
- }
- String topic = props.getProperty("topic");
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException ( "topic is needed" );
- }
- String contenttype = props.getProperty("contenttype");
- if (contenttype == null || contenttype.isEmpty()) {
- throw new IllegalArgumentException ( "contenttype is needed" );
- }
- if (!ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(transportType)){
- String username = props.getProperty("username");
- if (username == null || username.isEmpty()) {
- throw new IllegalArgumentException ( "username is needed" );
- }
- String password = props.getProperty("password");
- if (password == null || password.isEmpty()) {
- throw new IllegalArgumentException ( "password is needed" );
- }
- }
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(transportType)) {
- String authKey = props.getProperty("authKey");
- if (authKey == null || authKey.isEmpty()) {
- throw new IllegalArgumentException ( "authKey is needed" );
- }
- String authDate = props.getProperty("authDate");
- if (authDate == null || authDate.isEmpty()) {
- throw new IllegalArgumentException ( "authDate is needed" );
- }
-
- }
- }
-
-}