diff options
author | Varun Gudisena <vg411h@att.com> | 2017-08-31 10:44:28 -0500 |
---|---|---|
committer | Varun Gudisena <vg411h@att.com> | 2017-08-31 10:44:41 -0500 |
commit | 7d45c179879363222fcf49b30f75837f66d7f423 (patch) | |
tree | c5a344247515c1d8b74a6cc74bcea63541e4b46f /src/main/java/org/onap | |
parent | cc9de9bc6803212f0233e0e1bf06aa63fe8b7a6a (diff) |
Revert package name changes
Reverted package name changes to avoid any potential issues. Renamed maven
group id only.
Issue-id: DMAAP-74
Change-Id: I36c2aef063050c265640b79e6dc0e8ab7add8d22
Signed-off-by: Varun Gudisena <vg411h@att.com>
Diffstat (limited to 'src/main/java/org/onap')
45 files changed, 0 insertions, 7455 deletions
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/HostSelector.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/HostSelector.java deleted file mode 100644 index b30c2f1..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/HostSelector.java +++ /dev/null @@ -1,191 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client; - -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Random; -import java.util.Set; -import java.util.TreeSet; -import java.util.Vector; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HostSelector -{ - private final TreeSet<String> fBaseHosts; - private final DelayQueue<BlacklistEntry> fBlacklist; - private String fIdealHost; - private String fCurrentHost; - private static final Logger log = LoggerFactory.getLogger(HostSelector.class); - - public HostSelector(String hostPart) - { - this(makeSet(hostPart), null); - } - - public HostSelector(Collection<String> baseHosts) - { - this(baseHosts, null); - } - - public HostSelector(Collection<String> baseHosts, String signature) - { - if (baseHosts.size() < 1) - { - throw new IllegalArgumentException("At least one host must be provided."); - } - - this.fBaseHosts = new TreeSet(baseHosts); - this.fBlacklist = new DelayQueue(); - this.fIdealHost = null; - - if (signature == null) { - return; - } - int index = Math.abs(signature.hashCode()) % baseHosts.size(); - - Iterator it = this.fBaseHosts.iterator(); - while (index-- > 0) - { - it.next(); - } - this.fIdealHost = ((String)it.next()); - } - - public String selectBaseHost() - { - if (this.fCurrentHost == null) - { - makeSelection(); - } - return this.fCurrentHost; - } - - public void reportReachabilityProblem(long blacklistUnit, TimeUnit blacklistTimeUnit) - { - if (this.fCurrentHost == null) - { - log.warn("Reporting reachability problem, but no host is currently selected."); - } - - if (blacklistUnit > 0L) - { - for (BlacklistEntry be : this.fBlacklist) - { - if (be.getHost().equals(this.fCurrentHost)) - { - be.expireNow(); - } - } - - LinkedList devNull = new LinkedList(); - this.fBlacklist.drainTo(devNull); - - if (this.fCurrentHost != null) - { - this.fBlacklist.add(new BlacklistEntry(this.fCurrentHost, TimeUnit.MILLISECONDS.convert(blacklistUnit, blacklistTimeUnit))); - } - } - this.fCurrentHost = null; - } - - private String makeSelection() - { - TreeSet workingSet = new TreeSet(this.fBaseHosts); - - LinkedList devNull = new LinkedList(); - this.fBlacklist.drainTo(devNull); - for (BlacklistEntry be : this.fBlacklist) - { - workingSet.remove(be.getHost()); - } - - if (workingSet.size() == 0) - { - log.warn("All hosts were blacklisted; reverting to full set of hosts."); - workingSet.addAll(this.fBaseHosts); - this.fCurrentHost = null; - } - - String selection = null; - if ((this.fCurrentHost != null) && (workingSet.contains(this.fCurrentHost))) - { - selection = this.fCurrentHost; - } - else if ((this.fIdealHost != null) && (workingSet.contains(this.fIdealHost))) - { - selection = this.fIdealHost; - } - else - { - Vector v = new Vector(workingSet); - int index = Math.abs(new Random().nextInt()) % workingSet.size(); - selection = (String)v.elementAt(index); - } - - this.fCurrentHost = selection; - return this.fCurrentHost; - } - - private static Set<String> makeSet(String s) - { - TreeSet set = new TreeSet(); - set.add(s); - return set; } - - private static class BlacklistEntry implements Delayed { - private final String fHost; - private long fExpireAtMs; - - public BlacklistEntry(String host, long delayMs) { - this.fHost = host; - this.fExpireAtMs = (System.currentTimeMillis() + delayMs); - } - - public void expireNow() - { - this.fExpireAtMs = 0L; - } - - public String getHost() - { - return this.fHost; - } - - public int compareTo(Delayed o) - { - Long thisDelay = Long.valueOf(getDelay(TimeUnit.MILLISECONDS)); - return thisDelay.compareTo(Long.valueOf(o.getDelay(TimeUnit.MILLISECONDS))); - } - - public long getDelay(TimeUnit unit) - { - long remainingMs = this.fExpireAtMs - System.currentTimeMillis(); - return unit.convert(remainingMs, TimeUnit.MILLISECONDS); - } - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRBatchingPublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRBatchingPublisher.java deleted file mode 100644 index 4f09fce..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRBatchingPublisher.java +++ /dev/null @@ -1,56 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse; - -/** - * A MR batching publisher is a publisher with additional functionality - * for managing delayed sends. - * - * @author author - * - */ -public interface MRBatchingPublisher extends MRPublisher -{ - /** - * Get the number of messages that have not yet been sent. - * @return the number of pending messages - */ - int getPendingMessageCount (); - - /** - * Close this publisher, sending any remaining messages. - * @param timeout an amount of time to wait for unsent messages to be sent - * @param timeoutUnits the time unit for the timeout arg - * @return a list of any unsent messages after the timeout - * @throws IOException exception - * @throws InterruptedException exception - */ - List<message> close ( long timeout, TimeUnit timeoutUnits ) throws IOException, InterruptedException; - - MRPublisherResponse sendBatchWithResponse (); -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClient.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClient.java deleted file mode 100644 index fea00a1..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClient.java +++ /dev/null @@ -1,66 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client; - -import org.slf4j.Logger; - - -public interface MRClient -{ - /** - * An exception at the MR layer. This is used when the HTTP transport - * layer returns a success code but the transaction is not completed as expected. - */ - public class MRApiException extends Exception - { - public MRApiException ( String msg ) { super ( msg ); } - public MRApiException ( String msg, Throwable t ) { super ( msg, t ); } - private static final long serialVersionUID = 1L; - } - - /** - * Optionally set the Logger to use - * @param log log - */ - void logTo ( Logger log ); - - /** - * Set the API credentials for this client connection. Subsequent calls will - * include authentication headers.who i - */ - /** - * @param apiKey apikey - * @param apiSecret apisec - */ - void setApiCredentials ( String apiKey, String apiSecret ); - - /** - * Remove API credentials, if any, on this connection. Subsequent calls will not include - * authentication headers. - */ - void clearApiCredentials (); - - /** - * Close this connection. Some client interfaces have additional close capability. - */ - void close (); -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientBuilders.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientBuilders.java deleted file mode 100644 index 572f6d4..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientBuilders.java +++ /dev/null @@ -1,382 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client; - -import java.net.MalformedURLException; -import java.util.Collection; -import java.util.TreeSet; -import java.util.UUID; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRConsumerImpl; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRMetaClient; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRSimplerBatchPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A collection of builders for various types of MR API clients - * - * @author author - */ -public class MRClientBuilders -{ - /** - * A builder for a topic Consumer - * @author author - */ - public static class ConsumerBuilder - { - /** - * Construct a consumer builder. - */ - public ConsumerBuilder () {} - - /** - * Set the host list - * @param hostList a comma-separated list of hosts to use to connect to MR - * @return this builder - */ - public ConsumerBuilder usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); } - - /** - * Set the host list - * @param hostSet a set of hosts to use to connect to MR - * @return this builder - */ - public ConsumerBuilder usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; } - - /** - * Set the topic - * @param topic the name of the topic to consume - * @return this builder - */ - public ConsumerBuilder onTopic ( String topic ) { fTopic=topic; return this; } - - /** - * Set the consumer's group and ID - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consumer in its group - * @return this builder - */ - public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { fGroup = consumerGroup; fId = consumerId; return this; } - - /** - * Set the API key and secret for this client. - * @param apiKey - * @param apiSecret - * @return this builder - */ - public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; } - - /** - * Set the server side timeout - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. - * @return this builder - */ - public ConsumerBuilder waitAtServer ( int timeoutMs ) { fTimeoutMs = timeoutMs; return this; }; - - /** - * Set the maximum number of messages to receive per transaction - * @param limit The maximum number of messages to receive from the server in one transaction. - * @return this builder - */ - public ConsumerBuilder receivingAtMost ( int limit ) { fLimit = limit; return this; }; - - /** - * Set a filter to use on the server - * @param filter a Highland Park standard library filter encoded in JSON - * @return this builder - */ - public ConsumerBuilder withServerSideFilter ( String filter ) { fFilter = filter; return this; } - - /** - * Build the consumer - * @return a consumer - */ - public MRConsumer build () - { - if ( fHosts == null || fHosts.size() == 0 || fTopic == null ) - { - throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); - } - - if ( fGroup == null ) - { - fGroup = UUID.randomUUID ().toString (); - fId = "0"; - log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." ); - } - - if ( sfConsumerMock != null ) return sfConsumerMock; - try { - return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret ); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - } - - private Collection<String> fHosts = null; - private String fTopic = null; - private String fGroup = null; - private String fId = null; - private String fApiKey = null; - private String fApiSecret = null; - private int fTimeoutMs = -1; - private int fLimit = -1; - private String fFilter = null; - } - - /*************************************************************************/ - /*************************************************************************/ - /*************************************************************************/ - - /** - * A publisher builder - * @author author - */ - public static class PublisherBuilder - { - public PublisherBuilder () {} - - /** - * Set the MR/UEB host(s) to use - * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @return this builder - */ - public PublisherBuilder usingHosts ( String hostlist ) { return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); } - - /** - * Set the MR/UEB host(s) to use - * @param hostSet The host(s) used in the URL to MR. Can be "host:port" - * @return this builder - */ - public PublisherBuilder usingHosts ( String[] hostSet ) - { - final TreeSet<String> hosts = new TreeSet<String> (); - for ( String hp : hostSet ) - { - hosts.add ( hp ); - } - return usingHosts ( hosts ); - } - - /** - * Set the MR/UEB host(s) to use - * @param hostlist The host(s) used in the URL to MR. Can be "host:port". - * @return this builder - */ - public PublisherBuilder usingHosts ( Collection<String> hostlist ) { fHosts=hostlist; return this; } - - /** - * Set the topic to publish on - * @param topic The topic on which to publish messages. - * @return this builder - */ - public PublisherBuilder onTopic ( String topic ) { fTopic = topic; return this; } - - /** - * Batch message sends with the given limits. - * @param messageCount The largest set of messages to batch. - * @param ageInMs The maximum age of a message waiting in a batch. - * @return this builder - */ - public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { fMaxBatchSize = messageCount; fMaxBatchAgeMs = ageInMs; return this; } - - /** - * Compress transactions - * @return this builder - */ - public PublisherBuilder withCompresion () { return enableCompresion(true); } - - /** - * Do not compress transactions - * @return this builder - */ - public PublisherBuilder withoutCompresion () { return enableCompresion(false); } - - /** - * Set the compression option - * @param compress true to gzip compress transactions - * @return this builder - */ - public PublisherBuilder enableCompresion ( boolean compress ) { fCompress = compress; return this; } - - /** - * Set the API key and secret for this client. - * @param apiKey - * @param apiSecret - * @return this builder - */ - public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; } - - /** - * Build the publisher - * @return a batching publisher - */ - public MRBatchingPublisher build () - { - if ( fHosts == null || fHosts.size() == 0 || fTopic == null ) - { - throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); - } - - if ( sfPublisherMock != null ) return sfPublisherMock; - - final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls ( fHosts ). - onTopic ( fTopic ). - batchTo ( fMaxBatchSize, fMaxBatchAgeMs ). - compress ( fCompress ). - build (); - if ( fApiKey != null ) - { - pub.setApiCredentials ( fApiKey, fApiSecret ); - } - return pub; - } - - private Collection<String> fHosts = null; - private String fTopic = null; - private int fMaxBatchSize = 1; - private int fMaxBatchAgeMs = 1; - private boolean fCompress = false; - private String fApiKey = null; - private String fApiSecret = null; - } - - /** - * A builder for an identity manager - * @author author - */ - public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager> - { - /** - * Construct an identity manager builder. - */ - public IdentityManagerBuilder () {} - - @Override - protected MRIdentityManager constructClient ( Collection<String> hosts ) { try { - return new MRMetaClient ( hosts ); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } } - } - - /** - * A builder for a topic manager - * @author author - */ - public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager> - { - /** - * Construct an topic manager builder. - */ - public TopicManagerBuilder () {} - - @Override - protected MRTopicManager constructClient ( Collection<String> hosts ) { try { - return new MRMetaClient ( hosts ); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } } - } - - /** - * Inject a consumer. Used to support unit tests. - * @param cc - */ - public static void $testInject ( MRConsumer cc ) - { - sfConsumerMock = cc; - } - - /** - * Inject a publisher. Used to support unit tests. - * @param pub - */ - public static void $testInject ( MRBatchingPublisher pub ) - { - sfPublisherMock = pub; - } - - static MRConsumer sfConsumerMock = null; - static MRBatchingPublisher sfPublisherMock = null; - - /** - * A builder for an identity manager - * @author author - */ - public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient> - { - /** - * Construct an identity manager builder. - */ - public AbstractAuthenticatedManagerBuilder () {} - - /** - * Set the host list - * @param hostList a comma-separated list of hosts to use to connect to MR - * @return this builder - */ - public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); } - - /** - * Set the host list - * @param hostSet a set of hosts to use to connect to MR - * @return this builder - */ - public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; } - - /** - * Set the API key and secret for this client. - * @param apiKey - * @param apiSecret - * @return this builder - */ - public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; } - - /** - * Build the consumer - * @return a consumer - */ - public T build () - { - if ( fHosts == null || fHosts.size() == 0 ) - { - throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); - } - - final T mgr = constructClient ( fHosts ); - mgr.setApiCredentials ( fApiKey, fApiSecret ); - return mgr; - } - - protected abstract T constructClient ( Collection<String> hosts ); - - private Collection<String> fHosts = null; - private String fApiKey = null; - private String fApiSecret = null; - } - - private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class ); -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientFactory.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientFactory.java deleted file mode 100644 index 93d50be..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientFactory.java +++ /dev/null @@ -1,558 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.net.MalformedURLException; -import java.util.Collection; -import java.util.Map; -import java.util.Properties; -import java.util.TreeSet; -import java.util.UUID; - -import javax.ws.rs.core.MultivaluedMap; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRConsumerImpl; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRMetaClient; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRSimplerBatchPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants; - -/** - * A factory for MR clients.<br/> - * <br/> - * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates - * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive - * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's - * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across - * them. Be sure to use a different ID for each instance.)<br/> - * <br/> - * Publishers - * - * @author author - */ -public class MRClientFactory -{ - public static MultivaluedMap<String, Object> HTTPHeadersMap; - public static Map<String, String> DME2HeadersMap; - public static String routeFilePath; - - public static FileReader routeReader; - - public static FileWriter routeWriter= null; - public static Properties prop=null; - //routeReader= new FileReader(new File (routeFilePath)); - //props= new Properties(); - /** - * Create a consumer instance with the default timeout and no limit - * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. - * - * @param hostList A comma separated list of hosts to use to connect to MR. - * You can include port numbers (3904 is the default). For example, "hostname:8080," - * - * @param topic The topic to consume - * - * @return a consumer - */ - public static MRConsumer createConsumer ( String hostList, String topic ) - { - return createConsumer ( MRConsumerImpl.stringToList(hostList), topic ); - } - - /** - * Create a consumer instance with the default timeout and no limit - * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. - * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * - * @return a consumer - */ - public static MRConsumer createConsumer ( Collection<String> hostSet, String topic ) - { - return createConsumer ( hostSet, topic, null ); - } - - /** - * Create a consumer instance with server-side filtering, the default timeout, and no limit - * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. - * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param filter a filter to use on the server side - * - * @return a consumer - */ - public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter ) - { - return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null ); - } - - /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. - * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * - * @return a consumer - */ - public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId ) - { - return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 ); - } - - /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. - * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. - * - * @return a consumer - */ - public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit) - { - return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null ); - } - - /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. This consumer also uses - * server-side filtering. - * - * @param hostList A comma separated list of hosts to use to connect to MR. - * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. - * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". - * - * @return a consumer - */ - public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) - { - return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup, - consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); - } - - /** - * Create a consumer instance with the default timeout, and no limit - * on messages returned. This consumer can operate in a logical group and is re-startable - * across sessions when you use the same group and ID on restart. This consumer also uses - * server-side filtering. - * - * @param hostSet The host used in the URL to MR. Entries can be "host:port". - * @param topic The topic to consume - * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consume in its group - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout. - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. - * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter". - * - * @return a consumer - */ - public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret ) - { - if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock; - try { - return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret ); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - } - - /*************************************************************************/ - /*************************************************************************/ - /*************************************************************************/ - - /** - * Create a publisher that sends each message (or group of messages) immediately. Most - * applications should favor higher latency for much higher message throughput and the - * "simple publisher" is not a good choice. - * - * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @param topic The topic on which to publish messages. - * @return a publisher - */ - public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic ) - { - return createBatchingPublisher ( hostlist, topic, 1, 1 ); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. Message payloads are not compressed. - * - * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs ) - { - return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false ); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. - * - * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress ); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. - * - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - final TreeSet<String> hosts = new TreeSet<String> (); - for ( String hp : hostSet ) - { - hosts.add ( hp ); - } - return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress ); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. - * - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param topic The topic on which to publish messages. - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression - * - * @return a publisher - */ - public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - return new MRSimplerBatchPublisher.Builder (). - againstUrls ( hostSet ). - onTopic ( topic ). - batchTo ( maxBatchSize, maxAgeMs ). - compress ( compress ). - build (); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown. - * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param topic The topic on which to publish messages. - * @param username username - * @param password password - * @param maxBatchSize The largest set of messages to batch - * @param maxAgeMs The maximum age of a message waiting in a batch - * @param compress use gzip compression - * @param protocolFlag http auth or ueb auth or dme2 method - * @param producerFilePath all properties for publisher - * @return MRBatchingPublisher obj - */ - public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath ) - { - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls(MRConsumerImpl.stringToList(host)). - onTopic ( topic ). - batchTo ( maxBatchSize, maxAgeMs ). - compress ( compress ). - build (); - - pub.setHost(host); - pub.setUsername(username); - pub.setPassword(password); - pub.setProtocolFlag(protocolFlag); - pub.setProducerFilePath(producerFilePath); - return pub; - } - - - /** - * Create a publisher that batches messages. Be sure to close the publisher to - * send the last batch and ensure a clean shutdown - * @param producerFilePath set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException exc - * @throws IOException ioex - */ - public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException { - FileReader reader = new FileReader(new File (producerFilePath)); - Properties props = new Properties(); - props.load(reader); - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). - onTopic ( props.getProperty("topic") ). - batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). - compress (Boolean.parseBoolean(props.getProperty("compress"))). - httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). - build (); - pub.setHost(props.getProperty("host")); - if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ - - pub.setAuthKey(props.getProperty("authKey")); - pub.setAuthDate(props.getProperty("authDate")); - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - }else{ - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - } - pub.setProducerFilePath(producerFilePath); - pub.setProtocolFlag(props.getProperty("TransportType")); - pub.setProps(props); - routeFilePath=props.getProperty("DME2preferredRouterFilePath"); - routeReader= new FileReader(new File (routeFilePath)); - prop= new Properties(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File(routeFilePath)); - } - //pub.setContentType(contentType); - return pub; - } - - /** - * Create a publisher that will contain send methods that return - * response object to user. - * @param producerFilePath set all properties for publishing message - * @return MRBatchingPublisher obj - * @throws FileNotFoundException exc - * @throws IOException ioex - */ - public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException { - FileReader reader = new FileReader(new File (producerFilePath)); - Properties props = new Properties(); - props.load(reader); - MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))). - onTopic ( props.getProperty("topic") ). - batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())). - compress (Boolean.parseBoolean(props.getProperty("compress"))). - httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))). - withResponse(withResponse). - build (); - pub.setHost(props.getProperty("host")); - if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ - - pub.setAuthKey(props.getProperty("authKey")); - pub.setAuthDate(props.getProperty("authDate")); - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - }else{ - pub.setUsername(props.getProperty("username")); - pub.setPassword(props.getProperty("password")); - } - pub.setProducerFilePath(producerFilePath); - pub.setProtocolFlag(props.getProperty("TransportType")); - pub.setProps(props); - routeFilePath=props.getProperty("DME2preferredRouterFilePath"); - routeReader= new FileReader(new File (routeFilePath)); - prop= new Properties(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File(routeFilePath)); - } - //pub.setContentType(contentType); - return pub; - } - - - - - - - - - - - - /** - * Create an identity manager client to work with API keys. - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param apiKey Your API key - * @param apiSecret Your API secret - * @return an identity manager - */ - public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret ) - { - MRIdentityManager cim; - try { - cim = new MRMetaClient ( hostSet ); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - cim.setApiCredentials ( apiKey, apiSecret ); - return cim; - } - - /** - * Create a topic manager for working with topics. - * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover. - * @param apiKey Your API key - * @param apiSecret Your API secret - * @return a topic manager - */ - public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret ) - { - MRMetaClient tmi; - try { - tmi = new MRMetaClient ( hostSet ); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - tmi.setApiCredentials ( apiKey, apiSecret ); - return tmi; - } - - /** - * Inject a consumer. Used to support unit tests. - * @param cc - */ - public static void $testInject ( MRConsumer cc ) - { - MRClientBuilders.sfConsumerMock = cc; - } - - public static MRConsumer createConsumer(String host, String topic, String username, - String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) { - - MRConsumerImpl sub; - try { - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - sub.setUsername(username); - sub.setPassword(password); - sub.setHost(host); - sub.setProtocolFlag(protocalFlag); - sub.setConsumerFilePath(consumerFilePath); - return sub; - - } - - public static MRConsumer createConsumer(String host, String topic, String username, - String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) { - - MRConsumerImpl sub; - try { - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null ); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - sub.setUsername(username); - sub.setPassword(password); - sub.setHost(host); - sub.setProtocolFlag(protocalFlag); - sub.setConsumerFilePath(consumerFilePath); - return sub; - - } - - public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException { - FileReader reader = new FileReader(new File (consumerFilePath)); - Properties props = new Properties(); - props.load(reader); - int timeout; - if(props.getProperty("timeout")!=null) - timeout=Integer.parseInt(props.getProperty("timeout")); - else - timeout=-1; - int limit; - if(props.getProperty("limit")!=null) - limit=Integer.parseInt(props.getProperty("limit")); - else - limit=-1; - String group; - if(props.getProperty("group")==null) - group=UUID.randomUUID ().toString(); - else - group=props.getProperty("group"); - MRConsumerImpl sub=null; - if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){ - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") ); - sub.setAuthKey(props.getProperty("authKey")); - sub.setAuthDate(props.getProperty("authDate")); - sub.setUsername(props.getProperty("username")); - sub.setPassword(props.getProperty("password")); - }else{ - sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") ); - sub.setUsername(props.getProperty("username")); - sub.setPassword(props.getProperty("password")); - } - sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath")); - sub.setProps(props); - sub.setHost(props.getProperty("host")); - sub.setProtocolFlag(props.getProperty("TransportType")); - //sub.setConsumerFilePath(consumerFilePath); - sub.setfFilter(props.getProperty("filter")); - routeFilePath=props.getProperty("DME2preferredRouterFilePath"); - routeReader= new FileReader(new File (routeFilePath)); - prop= new Properties(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File(routeFilePath)); - } - return sub; - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRConsumer.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRConsumer.java deleted file mode 100644 index 214552b..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRConsumer.java +++ /dev/null @@ -1,54 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client; - -import java.io.IOException; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRConsumerResponse; - -public interface MRConsumer extends MRClient -{ - /** - * Fetch a set of messages. The consumer's timeout and message limit are used if set in the constructor call. - - * @return a set of messages - * @throws IOException - */ - Iterable<String> fetch () throws IOException, Exception; - - /** - * Fetch a set of messages with an explicit timeout and limit for this call. These values - * override any set in the constructor call. - * - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection - * open while waiting for message traffic. Use -1 for default timeout (controlled on the server-side). - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. - * @return a set messages - * @throws IOException if there's a problem connecting to the server - */ - Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException, Exception; - - MRConsumerResponse fetchWithReturnConsumerResponse (); - - - MRConsumerResponse fetchWithReturnConsumerResponse ( int timeoutMs, int limit ); -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRIdentityManager.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRIdentityManager.java deleted file mode 100644 index 34826c9..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRIdentityManager.java +++ /dev/null @@ -1,100 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client; - -import java.io.IOException; - -import com.att.nsa.apiClient.credentials.ApiCredential; -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.apiClient.http.HttpObjectNotFoundException; - -/** - * A client for manipulating API keys. - * @author author - * - */ -public interface MRIdentityManager extends MRClient -{ - /** - * An API Key record - */ - public interface ApiKey - { - /** - * Get the email address associated with the API key - * @return the email address on the API key or null - */ - String getEmail (); - - /** - * Get the description associated with the API key - * @return the description on the API key or null - */ - String getDescription (); - } - - /** - * Create a new API key on the UEB cluster. The returned credential instance - * contains the new API key and API secret. This is the only time the secret - * is available to the client -- there's no API for retrieving it later -- so - * your application must store it securely. - * - * @param email - * @param description - * @return a new credential - * @throws HttpException - * @throws MRApiException - * @throws IOException - */ - ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException; - - /** - * Get basic info about a known API key - * @param apiKey - * @return the API key's info or null if it doesn't exist - * @throws HttpObjectNotFoundException, HttpException, MRApiException - * @throws IOException - */ - ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, MRApiException, IOException; - - /** - * Update the record for the API key used to authenticate this request. The UEB - * API requires that you authenticate with the same key you're updating, so the - * API key being changed is the one used for setApiCredentials. - * - * @param email use null to keep the current value - * @param description use null to keep the current value - * @throws IOException - * @throws HttpException - * @throws HttpObjectNotFoundException - */ - void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException; - - /** - * Delete the *current* API key. After this call returns, the API key - * used to authenticate will no longer be valid. - * - * @throws IOException - * @throws HttpException - */ - void deleteCurrentApiKey () throws HttpException, IOException; -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRPublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRPublisher.java deleted file mode 100644 index 165bf0f..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRPublisher.java +++ /dev/null @@ -1,93 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client; - -import java.io.IOException; -import java.util.Collection; - -/** - * A MR publishing interface. - * - */ -public interface MRPublisher extends MRClient -{ - /** - * A simple message container - */ - public static class message - { - public message ( String partition, String msg ) - { - fPartition = partition == null ? "" : partition; - fMsg = msg; - if ( fMsg == null ) - { - throw new IllegalArgumentException ( "Can't send a null message." ); - } - } - - public message ( message msg ) - { - this ( msg.fPartition, msg.fMsg ); - } - - public final String fPartition; - public final String fMsg; - } - - /** - * Send the given message without partition. partition will be placed at HTTP request level. - * @param msg message to sent - * @return the number of pending messages - * @throws IOException exception - */ - int send ( String msg ) throws IOException; - /** - * Send the given message using the given partition. - * @param partition partition - * @param msg message - * @return the number of pending messages - * @throws IOException exception - */ - int send ( String partition, String msg ) throws IOException; - - /** - * Send the given message using its partition. - * @param msg mesg - * @return the number of pending messages - * @throws IOException exp - */ - int send ( message msg ) throws IOException; - - /** - * Send the given messages using their partitions. - * @param msgs msg - * @return the number of pending messages - * @throws IOException exp - */ - int send ( Collection<message> msgs ) throws IOException; - - /** - * Close this publisher. It's an error to call send() after close() - */ - void close (); -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRTopicManager.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRTopicManager.java deleted file mode 100644 index 3703385..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRTopicManager.java +++ /dev/null @@ -1,183 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client; - -import java.io.IOException; -import java.util.Set; - -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.apiClient.http.HttpObjectNotFoundException; - - -/** - * A client for working with topic metadata. - * @author author - */ -public interface MRTopicManager extends MRClient -{ - /** - * Get the topics available in the cluster - * @return a set of topic names - * @throws IOException - */ - Set<String> getTopics () throws IOException; - - /** - * Information about a topic. - */ - public interface TopicInfo - { - /** - * Get the owner of the topic - * @return the owner, or null if no entry - */ - String getOwner (); - - /** - * Get the description for this topic - * @return the description, or null if no entry - */ - String getDescription (); - - /** - * Get the set of allowed producers (as API keys) on this topic - * @return the set of allowed producers, null of no ACL exists/enabled - */ - Set<String> getAllowedProducers (); - - /** - * Get the set of allowed consumers (as API keys) on this topic - * @return the set of allowed consumers, null of no ACL exists/enabled - */ - Set<String> getAllowedConsumers (); - } - - /** - * Get information about a topic. - * @param topic - * @return topic information - * @throws IOException - * @throws HttpObjectNotFoundException - */ - TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException; - - /** - * Create a new topic. - * @param topicName - * @param topicDescription - * @param partitionCount - * @param replicationCount - * @throws HttpException - * @throws IOException - */ - void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException; - - /** - * Delete the topic. This call must be authenticated and the API key listed as owner on the topic. - * NOTE: The MR (UEB) API server does not support topic deletion at this time (mid 2015) - * @param topic - * @throws HttpException - * @throws IOException - * @deprecated If/when the Kafka system supports topic delete, or the implementation changes, this will be restored. - */ - @Deprecated - void deleteTopic ( String topic ) throws HttpException, IOException; - - /** - * Can any client produce events into this topic without authentication? - * @param topic - * @return true if the topic is open for producing - * @throws IOException - * @throws HttpObjectNotFoundException - */ - boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException; - - /** - * Get the set of allowed producers. If the topic is open, the result is null. - * @param topic - * @return a set of allowed producers or null - * @throws IOException - * @throws HttpObjectNotFoundException - */ - Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException; - - /** - * Allow the given API key to produce messages on the given topic. The caller must - * own this topic. - * @param topic - * @param apiKey - * @throws HttpException - * @throws HttpObjectNotFoundException - * @throws IOException - */ - void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException; - - /** - * Revoke the given API key's authorization to produce messages on the given topic. - * The caller must own this topic. - * @param topic - * @param apiKey - * @throws HttpException - * @throws IOException - */ - void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException; - - /** - * Can any client consume events from this topic without authentication? - * @param topic - * @return true if the topic is open for consuming - * @throws IOException - * @throws HttpObjectNotFoundException - */ - boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException; - - /** - * Get the set of allowed consumers. If the topic is open, the result is null. - * @param topic - * @return a set of allowed consumers or null - * @throws IOException - * @throws HttpObjectNotFoundException - */ - Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException; - - /** - * Allow the given API key to consume messages on the given topic. The caller must - * own this topic. - * @param topic - * @param apiKey - * @throws HttpException - * @throws HttpObjectNotFoundException - * @throws IOException - */ - void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException; - - /** - * Revoke the given API key's authorization to consume messages on the given topic. - * The caller must own this topic. - * @param topic - * @param apiKey - * @throws HttpException - * @throws IOException - */ - void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException; -} - diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/Clock.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/Clock.java deleted file mode 100644 index e1ea06e..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/Clock.java +++ /dev/null @@ -1,63 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl; - -public class Clock -{ - public synchronized static Clock getIt () - { - if ( sfClock == null ) - { - sfClock = new Clock (); - } - return sfClock; - } - - /** - * Get the system's current time in milliseconds. - * @return the current time - */ - public static long now () - { - return getIt().nowImpl (); - } - - /** - * Get current time in milliseconds - * @return current time in ms - */ - protected long nowImpl () - { - return System.currentTimeMillis (); - } - - protected Clock () - { - } - - private static Clock sfClock = null; - - protected synchronized static void register ( Clock testClock ) - { - sfClock = testClock; - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBaseClient.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBaseClient.java deleted file mode 100644 index 405ed90..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBaseClient.java +++ /dev/null @@ -1,392 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl; - -import java.net.MalformedURLException; -import java.util.Collection; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; - -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; - -import org.apache.http.HttpException; -import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; -import org.glassfish.jersey.internal.util.Base64; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClient; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.nsa.apiClient.http.CacheUse; -import com.att.nsa.apiClient.http.HttpClient; - -public class MRBaseClient extends HttpClient implements MRClient -{ - - private static final String MR_AUTH_CONSTANT = "X-CambriaAuth"; - private static final String MR_DATE_CONSTANT = "X-CambriaDate"; - - protected MRBaseClient ( Collection<String> hosts ) throws MalformedURLException - { - super ( ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort ); - } - - protected MRBaseClient ( Collection<String> hosts, int stdSvcPort ) throws MalformedURLException { - super ( ConnectionType.HTTP,hosts, stdSvcPort); - - fLog = LoggerFactory.getLogger ( this.getClass().getName () ); - } - - protected MRBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException - { - super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000); - - fLog = LoggerFactory.getLogger ( this.getClass().getName () ); - } - - - @Override - public void close () - { - } - - protected Set<String> jsonArrayToSet ( JSONArray a ) - { - if ( a == null ) return null; - - final TreeSet<String> set = new TreeSet<String> (); - for ( int i=0; i<a.length (); i++ ) - { - set.add ( a.getString ( i )); - } - return set; - } - - public void logTo ( Logger log ) - { - fLog = log; - replaceLogger ( log ); - } - - protected Logger getLog () - { - return fLog; - } - - private Logger fLog; - - public JSONObject post(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ - if ((null != username && null != password)) { - WebTarget target = null; - - Response response = null; - - target = getTarget(path, username, password); - String encoding = Base64.encodeAsString(username+":"+password); - - - response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType)); - - return getResponseDataInJson(response); - } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); - } - } - public String postWithResponse(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ - String responseData = null; - if ((null != username && null != password)) { - WebTarget target = null; - - Response response = null; - - target = getTarget(path, username, password); - String encoding = Base64.encodeAsString(username+":"+password); - - - response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType)); - - responseData = response.readEntity(String.class); - return responseData; - } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); - } - } - public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ - if ((null != username && null != password)) { - WebTarget target = null; - - Response response = null; - target= getTarget(path,username, password); - response = target.request() - .header(MR_AUTH_CONSTANT, authKey) - .header(MR_DATE_CONSTANT, authDate) - .post(Entity.entity(data, contentType)); - - return getResponseDataInJson(response); - } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); - } - } - public String postAuthwithResponse(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{ - String responseData = null; - if ((null != username && null != password)) { - WebTarget target = null; - - Response response = null; - target= getTarget(path,username, password); - response = target.request() - .header(MR_AUTH_CONSTANT, authKey) - .header(MR_DATE_CONSTANT, authDate) - .post(Entity.entity(data, contentType)); - responseData = response.readEntity(String.class); - return responseData; - - } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); - } - } - - - public JSONObject get(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException { - if (null != username && null != password) { - - WebTarget target = null; - - Response response = null; - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - target=getTarget(path); - response = target.request() - .header(MR_AUTH_CONSTANT, username) - .header(MR_DATE_CONSTANT, password) - .get(); - } else { - target = getTarget(path, username, password); - String encoding = Base64.encodeAsString(username+":"+password); - - response = target.request().header("Authorization", "Basic " + encoding).get(); - - } - return getResponseDataInJson(response); - } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); - } - } - - - public String getResponse(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException { - String responseData = null; - if (null != username && null != password) { - - WebTarget target = null; - - Response response = null; - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - target=getTarget(path); - response = target.request() - .header(MR_AUTH_CONSTANT, username) - .header(MR_DATE_CONSTANT, password) - .get(); - } else { - target = getTarget(path, username, password); - String encoding = Base64.encodeAsString(username+":"+password); - response = target.request().header("Authorization", "Basic " + encoding).get(); - } - MRClientFactory.HTTPHeadersMap=response.getHeaders(); - - String transactionid=response.getHeaderString("transactionid"); - if (transactionid!=null && !transactionid.equalsIgnoreCase("")) { - fLog.info("TransactionId : " + transactionid); - } - - responseData = response.readEntity(String.class); - return responseData; - } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); - } - } - - public JSONObject getAuth(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException { - if (null != username && null != password) { - - WebTarget target = null; - - Response response = null; - target=getTarget(path, username, password); - response = target.request() - .header(MR_AUTH_CONSTANT, authKey) - .header(MR_DATE_CONSTANT, authDate) - .get(); - - return getResponseDataInJson(response); - } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); - } - } - - public String getAuthResponse(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException { - String responseData = null; - if (null != username && null != password) { - - WebTarget target = null; - - Response response = null; - target=getTarget(path, username, password); - response = target.request() - .header(MR_AUTH_CONSTANT, authKey) - .header(MR_DATE_CONSTANT, authDate) - .get(); - - MRClientFactory.HTTPHeadersMap=response.getHeaders(); - - String transactionid=response.getHeaderString("transactionid"); - if (transactionid!=null && !transactionid.equalsIgnoreCase("")) { - fLog.info("TransactionId : " + transactionid); - } - - responseData = response.readEntity(String.class); - return responseData; - } else { - throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); - } - } - - private WebTarget getTarget(final String path, final String username, final String password) { - - Client client = ClientBuilder.newClient(); - - - // Using UNIVERSAL as it supports both BASIC and DIGEST authentication types. - HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password); - client.register(feature); - - return client.target(path); - } - - - private WebTarget getTarget(final String path) { - - Client client = ClientBuilder.newClient(); - return client.target(path); - } - private JSONObject getResponseDataInJson(Response response) throws JSONException { - try { - MRClientFactory.HTTPHeadersMap=response.getHeaders(); - // fLog.info("DMAAP response status: " + response.getStatus()); - - - //MultivaluedMap<String, Object> headersMap = response.getHeaders(); - //for(String key : headersMap.keySet()) { - String transactionid=response.getHeaderString("transactionid"); - if (transactionid!=null && !transactionid.equalsIgnoreCase("")) { - fLog.info("TransactionId : " + transactionid); - } - - /*final String responseData = response.readEntity(String.class); - JSONTokener jsonTokener = new JSONTokener(responseData); - JSONObject jsonObject = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if ('[' == firstChar) { - JSONArray jsonArray = new JSONArray(jsonTokener); - jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); - } else { - jsonObject = new JSONObject(jsonTokener); - } - - return jsonObject;*/ - - - if(response.getStatus()==403) { - JSONObject jsonObject = null; - jsonObject = new JSONObject(); - JSONArray jsonArray = new JSONArray(); - jsonArray.put(response.getEntity()); - jsonObject.put("result", jsonArray); - jsonObject.put("status", response.getStatus()); - return jsonObject; - } - String responseData = response.readEntity(String.class); - - JSONTokener jsonTokener = new JSONTokener(responseData); - JSONObject jsonObject = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if ('[' == firstChar) { - JSONArray jsonArray = new JSONArray(jsonTokener); - jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); - jsonObject.put("status", response.getStatus()); - } else { - jsonObject = new JSONObject(jsonTokener); - jsonObject.put("status", response.getStatus()); - } - - return jsonObject; - } catch (JSONException excp) { - fLog.error("DMAAP - Error reading response data.", excp); - return null; - } - - } - - public String getHTTPErrorResponseMessage(String responseString){ - - String response = null; - int beginIndex = 0; - int endIndex = 0; - if(responseString.contains("<body>")){ - - beginIndex = responseString.indexOf("body>")+5; - endIndex = responseString.indexOf("</body"); - response = responseString.substring(beginIndex,endIndex); - } - - return response; - - } - - public String getHTTPErrorResponseCode(String responseString){ - - String response = null; - int beginIndex = 0; - int endIndex = 0; - if(responseString.contains("<title>")){ - beginIndex = responseString.indexOf("title>")+6; - endIndex = responseString.indexOf("</title"); - response = responseString.substring(beginIndex,endIndex); - } - - return response; - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBatchPublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBatchPublisher.java deleted file mode 100644 index 28affcc..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBatchPublisher.java +++ /dev/null @@ -1,485 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.MalformedURLException; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import java.util.zip.GZIPOutputStream; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.nsa.apiClient.http.HttpClient; -import com.att.nsa.apiClient.http.HttpException; - -/** - * This is a batching publisher class that allows the client to publish messages - * in batches that are limited in terms of size and/or hold time. - * - * @author author - * @deprecated This class's tricky locking doesn't quite work - * - */ -@Deprecated -public class MRBatchPublisher implements MRBatchingPublisher -{ - public static final long kMinMaxAgeMs = 1; - - /** - * Create a batch publisher. - * - * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path. - * @param topic the topic to publish to - * @param maxBatchSize the maximum size of a batch - * @param maxAgeMs the maximum age of a batch - */ - public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - if ( maxAgeMs < kMinMaxAgeMs ) - { - fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs ); - maxAgeMs = kMinMaxAgeMs; - } - - try { - fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress ); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - - // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for - // the oldest msg to hit max age? (locking is complicated, but should be do-able) - fExec = new ScheduledThreadPoolExecutor ( 1 ); - fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS ); - } - - @Override - public void setApiCredentials ( String apiKey, String apiSecret ) - { - fSender.setApiCredentials ( apiKey, apiSecret ); - } - - @Override - public void clearApiCredentials () - { - fSender.clearApiCredentials (); - } - - /** - * Send the given message with the given partition - * @param partition - * @param msg - * @throws IOException - */ - @Override - public int send ( String partition, String msg ) throws IOException - { - return send ( new message ( partition, msg ) ); - } - @Override - public int send ( String msg ) throws IOException - { - return send ( new message ( "",msg ) ); - } - /** - * Send the given message - * @param userMsg a message - * @throws IOException - */ - @Override - public int send ( message userMsg ) throws IOException - { - final LinkedList<message> list = new LinkedList<message> (); - list.add ( userMsg ); - return send ( list ); - } - - /** - * Send the given set of messages - * @param msgs the set of messages, sent in order of iteration - * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing) - * @throws IOException - */ - @Override - public int send ( Collection<message> msgs ) throws IOException - { - if ( msgs.size() > 0 ) - { - fSender.queue ( msgs ); - } - return fSender.size (); - } - - @Override - public int getPendingMessageCount () - { - return fSender.size (); - } - - /** - * Send any pending messages and close this publisher. - * @throws IOException - * @throws InterruptedException - */ - @Override - public void close () - { - try - { - final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS ); - if ( remains.size() > 0 ) - { - fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. " - + "(Consider using the alternate close method to capture unsent messages in this case.)" ); - } - } - catch ( InterruptedException e ) - { - fLog.warn ( "Possible message loss. " + e.getMessage(), e ); - } - catch ( IOException e ) - { - fLog.warn ( "Possible message loss. " + e.getMessage(), e ); - } - } - - public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException - { - fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false ); - fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false ); - fExec.shutdown (); - - final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit ); - final long timeoutAtMs = System.currentTimeMillis () + waitInMs; - while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 ) - { - fSender.checkSend ( true ); - Thread.sleep ( 250 ); - } - - final LinkedList<message> result = new LinkedList<message> (); - fSender.drainTo ( result ); - return result; - } - - private final ScheduledThreadPoolExecutor fExec; - private final Sender fSender; - - private static class TimestampedMessage extends message - { - public TimestampedMessage ( message m ) - { - super ( m ); - timestamp = System.currentTimeMillis (); - } - public final long timestamp; - } - - private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class ); - - private class Sender extends MRBaseClient implements Runnable - { - public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException - { - super ( baseUrls ); - - fNextBatch = new LinkedList<TimestampedMessage> (); - fSendingBatch = null; - fTopic = topic; - fMaxBatchSize = maxBatch; - fMaxAgeMs = maxAgeMs; - fCompress = compress; - fLock = new ReentrantReadWriteLock (); - fWriteLock = fLock.writeLock (); - fReadLock = fLock.readLock (); - fDontSendUntilMs = 0; - } - - public void drainTo ( LinkedList<message> list ) - { - fWriteLock.lock (); - try - { - if ( fSendingBatch != null ) - { - list.addAll ( fSendingBatch ); - } - list.addAll ( fNextBatch ); - - fSendingBatch = null; - fNextBatch.clear (); - } - finally - { - fWriteLock.unlock (); - } - } - - /** - * Called periodically by the background executor. - */ - @Override - public void run () - { - try - { - checkSend ( false ); - } - catch ( IOException e ) - { - fLog.warn ( "MR background send: " + e.getMessage () ); - } - } - - public int size () - { - fReadLock.lock (); - try - { - return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () ); - } - finally - { - fReadLock.unlock (); - } - } - - /** - * Called to queue a message. - * @param m - * @throws IOException - */ - public void queue ( Collection<message> msgs ) throws IOException - { - fWriteLock.lock (); - try - { - for ( message userMsg : msgs ) - { - if ( userMsg != null ) - { - fNextBatch.add ( new TimestampedMessage ( userMsg ) ); - } - else - { - fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." ); - } - } - } - finally - { - fWriteLock.unlock(); - } - checkSend ( false ); - } - - /** - * Send a batch if the queue is long enough, or the first pending message is old enough. - * @param force - * @throws IOException - */ - public void checkSend ( boolean force ) throws IOException - { - // hold a read lock just long enough to evaluate whether a batch - // should be sent - boolean shouldSend = false; - fReadLock.lock (); - try - { - if ( fNextBatch.size () > 0 ) - { - final long nowMs = System.currentTimeMillis (); - shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize ); - if ( !shouldSend ) - { - final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs; - shouldSend = sendAtMs <= nowMs; - } - - // however, unless forced, wait after an error - shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs ); - } - // else: even in 'force', there's nothing to send, so shouldSend=false is fine - } - finally - { - fReadLock.unlock (); - } - - // if a send is required, acquire a write lock, swap out the next batch, - // swap in a fresh batch, and release the lock for the caller to start - // filling a batch again. After releasing the lock, send the current - // batch. (There could be more messages added between read unlock and - // write lock, but that's fine.) - if ( shouldSend ) - { - fSendingBatch = null; - - fWriteLock.lock (); - try - { - fSendingBatch = fNextBatch; - fNextBatch = new LinkedList<TimestampedMessage> (); - } - finally - { - fWriteLock.unlock (); - } - - if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) ) - { - fLog.warn ( "Send failed, rebuilding send queue." ); - - // note the time for back-off - fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis (); - - // the send failed. reconstruct the pending queue - fWriteLock.lock (); - try - { - final LinkedList<TimestampedMessage> nextGroup = fNextBatch; - fNextBatch = fSendingBatch; - fNextBatch.addAll ( nextGroup ); - fSendingBatch = null; - fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." ); - } - finally - { - fWriteLock.unlock (); - } - } - else - { - fWriteLock.lock (); - try - { - fSendingBatch = null; - } - finally - { - fWriteLock.unlock (); - } - } - } - } - - private LinkedList<TimestampedMessage> fNextBatch; - private LinkedList<TimestampedMessage> fSendingBatch; - private final String fTopic; - private final int fMaxBatchSize; - private final long fMaxAgeMs; - private final boolean fCompress; - private final ReentrantReadWriteLock fLock; - private final WriteLock fWriteLock; - private final ReadLock fReadLock; - private long fDontSendUntilMs; - private static final long sfWaitAfterError = 1000; - } - - // this is static so that it's clearly not using any mutable member data outside of a lock - private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log ) - { - // it's possible for this call to be made with an empty list. in this case, just return. - if ( toSend.size() < 1 ) - { - return true; - } - - final long nowMs = System.currentTimeMillis (); - final String url = MRConstants.makeUrl ( topic ); - - log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms" ); - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); - try - { - OutputStream os = baseStream; - if ( compress ) - { - os = new GZIPOutputStream ( baseStream ); - } - for ( TimestampedMessage m : toSend ) - { - os.write ( ( "" + m.fPartition.length () ).getBytes() ); - os.write ( '.' ); - os.write ( ( "" + m.fMsg.length () ).getBytes() ); - os.write ( '.' ); - os.write ( m.fPartition.getBytes() ); - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - } - catch ( IOException e ) - { - log.warn ( "Problem writing stream to post: " + e.getMessage () ); - return false; - } - - boolean result = false; - final long startMs = System.currentTimeMillis (); - try - { - client.post ( url, compress ? - MRFormat.CAMBRIA_ZIP.toString () : - MRFormat.CAMBRIA.toString (), - baseStream.toByteArray(), false ); - result = true; - } - catch ( HttpException e ) - { - log.warn ( "Problem posting to MR: " + e.getMessage() ); - } - catch ( IOException e ) - { - log.warn ( "Problem posting to MR: " + e.getMessage() ); - } - - log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" ); - return result; - } - - @Override - public void logTo ( Logger log ) - { - fLog = log; - } - - @Override - public MRPublisherResponse sendBatchWithResponse() { - // TODO Auto-generated method stub - return null; - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRClientVersionInfo.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRClientVersionInfo.java deleted file mode 100644 index 3b68363..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRClientVersionInfo.java +++ /dev/null @@ -1,54 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -public class MRClientVersionInfo -{ - public static String getVersion () - { - return version; - } - - private static final Properties props = new Properties(); - private static final String version; - static - { - String use = null; - try - { - final InputStream is = MRClientVersionInfo.class.getResourceAsStream ( "/MRClientVersion.properties" ); - if ( is != null ) - { - props.load ( is ); - use = props.getProperty ( "MRClientVersion", null ); - } - } - catch ( IOException e ) - { - } - version = use; - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConstants.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConstants.java deleted file mode 100644 index 4039a0b..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConstants.java +++ /dev/null @@ -1,180 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl; - -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.http.HttpHost; - -class MRConstants -{ - private static final String PROTOCOL = "http"; - public static final String context = "/"; - public static final String kBasePath = "events/"; - //public static final int kStdMRServicePort = 3904; - public static final int kStdMRServicePort = 8080; - - public static String escape ( String s ) - { - try - { - return URLEncoder.encode ( s, "UTF-8"); - } - catch ( UnsupportedEncodingException e ) - { - throw new RuntimeException ( e ); - } - } - - public static String makeUrl ( String rawTopic ) - { - final String cleanTopic = escape ( rawTopic ); - - final StringBuffer url = new StringBuffer(). - append ( MRConstants.context ). - append ( MRConstants.kBasePath ). - append ( cleanTopic ); - return url.toString (); - } - - public static String makeUrl ( final String host, final String rawTopic ) - { - final String cleanTopic = escape ( rawTopic ); - - final StringBuffer url = new StringBuffer(); - - if (!host.startsWith("http") || !host.startsWith("https") ) { - url.append( PROTOCOL + "://" ); - } - url.append(host); - url.append ( MRConstants.context ); - url.append ( MRConstants.kBasePath ); - url.append ( cleanTopic ); - return url.toString (); - } - - public static String makeUrl ( final String host, final String rawTopic, final String transferprotocol,final String parttion ) - { - final String cleanTopic = escape ( rawTopic ); - - final StringBuffer url = new StringBuffer(); - - if (transferprotocol !=null && !transferprotocol.equals("")) { - url.append( transferprotocol + "://" ); - }else{ - url.append( PROTOCOL + "://" ); - } - url.append(host); - url.append ( MRConstants.context ); - url.append ( MRConstants.kBasePath ); - url.append ( cleanTopic ); - if(parttion!=null && !parttion.equalsIgnoreCase("")) - url.append("?partitionKey=").append(parttion); - return url.toString (); - } - public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId ) - { - final String cleanConsumerGroup = escape ( rawConsumerGroup ); - final String cleanConsumerId = escape ( rawConsumerId ); - return MRConstants.context + MRConstants.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId; - } - - /** - * Create a list of HttpHosts from an input list of strings. Input strings have - * host[:port] as format. If the port section is not provided, the default port is used. - * - * @param hosts - * @return a list of hosts - */ - public static List<HttpHost> createHostsList(Collection<String> hosts) - { - final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> (); - for ( String host : hosts ) - { - if ( host.length () == 0 ) continue; - convertedHosts.add ( hostForString ( host ) ); - } - return convertedHosts; - } - - /** - * Return an HttpHost from an input string. Input string has - * host[:port] as format. If the port section is not provided, the default port is used. - * - * @param hosts - * @return a list of hosts - */ - public static HttpHost hostForString ( String host ) - { - if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." ); - - String hostPart = host; - int port = kStdMRServicePort; - - final int colon = host.indexOf ( ':' ); - if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." ); - if ( colon > 0 ) - { - hostPart = host.substring ( 0, colon ).trim(); - - final String portPart = host.substring ( colon + 1 ).trim(); - if ( portPart.length () > 0 ) - { - try - { - port = Integer.parseInt ( portPart ); - } - catch ( NumberFormatException x ) - { - throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x ); - } - } - // else: use default port on "foo:" - } - - return new HttpHost ( hostPart, port ); - } - - public static String makeConsumerUrl(String host, String fTopic, String fGroup, String fId,final String transferprotocol) { - final String cleanConsumerGroup = escape ( fGroup ); - final String cleanConsumerId = escape ( fId ); - - StringBuffer url = new StringBuffer(); - - if (transferprotocol !=null && !transferprotocol.equals("")) { - url.append( transferprotocol + "://" ); - }else{ - url.append( PROTOCOL + "://" ); - } - - url.append(host); - url.append(context); - url.append(kBasePath); - url.append(fTopic + "/" + cleanConsumerGroup + "/" + cleanConsumerId); - - return url.toString(); - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConsumerImpl.java deleted file mode 100644 index 8f4d777..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConsumerImpl.java +++ /dev/null @@ -1,709 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl; - -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URLEncoder; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; - -import org.apache.http.HttpException; -import org.apache.http.HttpStatus; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRConsumerResponse; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.aft.dme2.api.DME2Client; -import com.att.aft.dme2.api.DME2Exception; - -public class MRConsumerImpl extends MRBaseClient implements MRConsumer -{ - - private static final String SUCCESS_MESSAGE = "Success"; - - - private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () ); - public static List<String> stringToList ( String str ) - { - final LinkedList<String> set = new LinkedList<String> (); - if ( str != null ) - { - final String[] parts = str.trim ().split ( "," ); - for ( String part : parts ) - { - final String trimmed = part.trim(); - if ( trimmed.length () > 0 ) - { - set.add ( trimmed ); - } - } - } - return set; - } - - public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, String apiSecret_password ) throws MalformedURLException - { - this( hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, false ); - } - - public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, boolean allowSelfSignedCerts ) throws MalformedURLException - { - super ( hostPart, topic + "::" + consumerGroup + "::" + consumerId ); - - fTopic = topic; - fGroup = consumerGroup; - fId = consumerId; - fTimeoutMs = timeoutMs; - fLimit = limit; - fFilter = filter; - - //setApiCredentials ( apiKey, apiSecret ); - } - - @Override - public Iterable<String> fetch () throws IOException,Exception - { - // fetch with the timeout and limit set in constructor - return fetch ( fTimeoutMs, fLimit ); - } - - @Override - public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException,Exception - { - final LinkedList<String> msgs = new LinkedList<String> (); - -// FIXME: the timeout on the socket needs to be at least as long as the long poll -// // sanity check for long poll timeout vs. socket read timeout -// final int maxReasonableTimeoutMs = CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10; -// if ( timeoutMs > maxReasonableTimeoutMs ) -// { -// log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. socket read timeout (" + -// CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll timeout to " + maxReasonableTimeoutMs + "." ); -// timeoutMs = maxReasonableTimeoutMs; -// } - - // final String urlPath = createUrlPath ( timeoutMs, limit ); - - //getLog().info ( "UEB GET " + urlPath ); - try - { - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - DMEConfigure(timeoutMs, limit); - try - { - //getLog().info ( "Receiving msgs from: " + url+subContextPath ); - String reply = sender.sendAndWait(timeoutMs+10000L); - // System.out.println("Message received = "+reply); - final JSONObject o =getResponseDataInJson(reply); - //msgs.add(reply); - if ( o != null ) - { - final JSONArray a = o.getJSONArray ( "result" ); - // final int b = o.getInt("status" ); - //if ( a != null && a.length()>0 ) - if ( a != null) - { - for ( int i=0; i<a.length (); i++ ) - { - //msgs.add("DMAAP response status: "+Integer.toString(b)); - if (a.get(i) instanceof String) - msgs.add ( a.getString(i) ); - else - msgs.add ( a.getJSONObject(i).toString() ); - - - } - } -// else if(a != null && a.length()<1){ -// msgs.add ("[]"); -// } - } - } - catch ( JSONException e ) - { - // unexpected response - reportProblemWithResponse (); - } - catch ( HttpException e ) - { - throw new IOException ( e ); - } - } - - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId,props.getProperty("Protocol")), timeoutMs, limit ); - - - try - { - final JSONObject o = get ( urlPath, username, password, protocolFlag ); - - if ( o != null ) - { - final JSONArray a = o.getJSONArray ( "result" ); - final int b = o.getInt("status" ); - //if ( a != null && a.length()>0 ) - if ( a != null) - { - for ( int i=0; i<a.length (); i++ ) - { - msgs.add("DMAAP response status: "+Integer.toString(b)); - if (a.get(i) instanceof String) - msgs.add ( a.getString(i) ); - else - msgs.add ( a.getJSONObject(i).toString() ); - - } - } -// else if(a != null && a.length()<1) -// { -// msgs.add ("[]"); -// } - } - } - catch ( JSONException e ) - { - // unexpected response - reportProblemWithResponse (); - } - catch ( HttpException e ) - { - throw new IOException ( e ); - } - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ,props.getProperty("Protocol")), timeoutMs, limit ); - - - try - { - final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag ); - if ( o != null ) - { - final JSONArray a = o.getJSONArray ( "result" ); - final int b = o.getInt("status" ); - //if ( a != null && a.length()>0) - if ( a != null) - { - for ( int i=0; i<a.length (); i++ ) - { - msgs.add("DMAAP response status: "+Integer.toString(b)); - if (a.get(i) instanceof String) - msgs.add ( a.getString(i) ); - else - msgs.add ( a.getJSONObject(i).toString() ); - - } - } -// else if(a != null && a.length()<1){ -// msgs.add ("[]"); -// } - } - } - catch ( JSONException e ) - { - // unexpected response - reportProblemWithResponse (); - } - catch ( HttpException e ) - { - throw new IOException ( e ); - } - - } - - } catch ( JSONException e ) { - // unexpected response - reportProblemWithResponse (); - } catch (HttpException e) { - throw new IOException(e); - } catch (Exception e ) { - throw e; - } - - - return msgs; - } - - private JSONObject getResponseDataInJson(String response) { - try { - - - //fLog.info("DMAAP response status: " + response.getStatus()); - - // final String responseData = response.readEntity(String.class); - JSONTokener jsonTokener = new JSONTokener(response); - JSONObject jsonObject = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if ('[' == firstChar) { - JSONArray jsonArray = new JSONArray(jsonTokener); - jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); - } else { - jsonObject = new JSONObject(jsonTokener); - } - - return jsonObject; - } catch (JSONException excp) { - // fLog.error("DMAAP - Error reading response data.", excp); - return null; - } - - - -} - - private JSONObject getResponseDataInJsonWithResponseReturned(String response) { - JSONTokener jsonTokener = new JSONTokener(response); - JSONObject jsonObject = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if(null != response && response.length()==0){ - return null; - } - - if ('[' == firstChar) { - JSONArray jsonArray = new JSONArray(jsonTokener); - jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); - } else if('{' == firstChar){ - return null; - } else if('<' == firstChar){ - return null; - }else{ - jsonObject = new JSONObject(jsonTokener); - } - - return jsonObject; - - } - - private final String fTopic; - private final String fGroup; - private final String fId; - private final int fTimeoutMs; - private final int fLimit; - private String fFilter; - private String username; - private String password; - private String host; - private String latitude; - private String longitude; - private String version; - private String serviceName; - private String env; - private String partner; - private String routeOffer; - private String subContextPath; - private String protocol; - private String methodType; - private String url; - private String dmeuser; - private String dmepassword; - private String contenttype; - private DME2Client sender; - public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); - public String consumerFilePath; - private String authKey; - private String authDate; - private Properties props; - private HashMap<String, String> DMETimeOuts; - private String handlers; - public static String routerFilePath; - public static String getRouterFilePath() { - return routerFilePath; - } - - public static void setRouterFilePath(String routerFilePath) { - MRSimplerBatchPublisher.routerFilePath = routerFilePath; - } - public String getConsumerFilePath() { - return consumerFilePath; - } - - public void setConsumerFilePath(String consumerFilePath) { - this.consumerFilePath = consumerFilePath; - } - - public String getProtocolFlag() { - return protocolFlag; - } - - public void setProtocolFlag(String protocolFlag) { - this.protocolFlag = protocolFlag; - } - - private void DMEConfigure(int timeoutMs, int limit)throws IOException,DME2Exception, URISyntaxException{ - latitude = props.getProperty("Latitude"); - longitude = props.getProperty("Longitude"); - version = props.getProperty("Version"); - serviceName = props.getProperty("ServiceName"); - env = props.getProperty("Environment"); - partner = props.getProperty("Partner"); - routeOffer = props.getProperty("routeOffer"); - - subContextPath=props.getProperty("SubContextPath")+fTopic+"/"+fGroup+"/"+fId; - // subContextPath=createUrlPath (subContextPath, timeoutMs, limit); - //if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, timeoutMs); - - protocol = props.getProperty("Protocol"); - methodType = props.getProperty("MethodType"); - dmeuser = props.getProperty("username"); - dmepassword = props.getProperty("password"); - contenttype = props.getProperty("contenttype"); - handlers = props.getProperty("sessionstickinessrequired"); - //url =protocol+"://DME2SEARCH/"+ "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner; - // url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner; - - /** - * Changes to DME2Client url to use Partner for auto failover between data centers - * When Partner value is not provided use the routeOffer value for auto failover within a cluster - */ - - String preferredRouteKey = readRoute("preferredRouteKey"); - - if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner+"&routeoffer="+preferredRouteKey; - }else if (partner != null && !partner.isEmpty()) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; - } - else if (routeOffer!=null && !routeOffer.isEmpty()) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer; - } - - //fLog.info("url :"+url); - - if(timeoutMs != -1 )url=url+"&timeout="+timeoutMs; - if(limit != -1 )url=url+"&limit="+limit; - - DMETimeOuts = new HashMap<String, String>(); - DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); - DMETimeOuts.put("Content-Type", contenttype); - System.setProperty("AFT_LATITUDE", latitude); - System.setProperty("AFT_LONGITUDE", longitude); - System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT")); - // System.setProperty("DME2.DEBUG", "true"); - - //SSL changes - System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", - "SSLv3,TLSv1,TLSv1.1"); - System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); - System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); - //SSL changes - - sender = new DME2Client(new URI(url), timeoutMs+10000L); - sender.setAllowAllHttpReturnCodes(true); - sender.setMethod(methodType); - sender.setSubContext(subContextPath); - if(dmeuser != null && dmepassword != null){ - sender.setCredentials(dmeuser, dmepassword); - //System.out.println(dmepassword); - } - sender.setHeaders(DMETimeOuts); - sender.setPayload(""); - - if(handlers.equalsIgnoreCase("yes")){ - sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); - sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); - }else{ - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client.HeaderReplyHandler"); - } - /* HeaderReplyHandler headerhandler= new HeaderReplyHandler(); - sender.setReplyHandler(headerhandler);*/ -// } catch (DME2Exception x) { -// getLog().warn(x.getMessage(), x); -// System.out.println("XXXXXXXXXXXX"+x); -// } catch (URISyntaxException x) { -// System.out.println(x); -// getLog().warn(x.getMessage(), x); -// } catch (Exception x) { -// System.out.println("XXXXXXXXXXXX"+x); -// getLog().warn(x.getMessage(), x); -// } - } - public Properties getProps() { - return props; - } - - public void setProps(Properties props) { - this.props = props; - } - - protected String createUrlPath (String url, int timeoutMs , int limit ) throws IOException - { - final StringBuffer contexturl= new StringBuffer(url); - // final StringBuffer url = new StringBuffer ( CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) ); - final StringBuffer adds = new StringBuffer (); - if ( timeoutMs > -1 ) adds.append ( "timeout=" ).append ( timeoutMs ); - if ( limit > -1 ) - { - if ( adds.length () > 0 ) - { - adds.append ( "&" ); - } - adds.append ( "limit=" ).append ( limit ); - } - if ( fFilter != null && fFilter.length () > 0 ) - { - try { - if ( adds.length () > 0 ) - { - adds.append ( "&" ); - } - adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8")); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e.getMessage() + "....say whaaaat?!"); - } - } - if ( adds.length () > 0 ) - { - contexturl.append ( "?" ).append ( adds.toString () ); - } - - //sender.setSubContext(url.toString()); - return contexturl.toString (); - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public String getAuthKey() { - return authKey; - } - - public void setAuthKey(String authKey) { - this.authKey = authKey; - } - - public String getAuthDate() { - return authDate; - } - - public void setAuthDate(String authDate) { - this.authDate = authDate; - } - - public String getfFilter() { - return fFilter; - } - - public void setfFilter(String fFilter) { - this.fFilter = fFilter; - } - - private String readRoute(String routeKey) { - - try { - - MRClientFactory.prop.load(new FileReader(new File (MRClientFactory.routeFilePath))); - - } catch (Exception ex) { - fLog.error("Reply Router Error " + ex.toString() ); - } - String routeOffer = MRClientFactory.prop.getProperty(routeKey); - return routeOffer; - } - - @Override - public MRConsumerResponse fetchWithReturnConsumerResponse() { - - // fetch with the timeout and limit set in constructor - return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit); - } - - @Override - public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, - int limit) { - final LinkedList<String> msgs = new LinkedList<String>(); - MRConsumerResponse mrConsumerResponse = new MRConsumerResponse(); - try { - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase( - protocolFlag)) { - DMEConfigure(timeoutMs, limit); - - String reply = sender.sendAndWait(timeoutMs + 10000L); - - final JSONObject o = getResponseDataInJsonWithResponseReturned(reply); - - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - - } - createMRConsumerResponse(reply, mrConsumerResponse); - } - - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase( - protocolFlag)) { - final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, - props.getProperty("Protocol")), timeoutMs, - limit); - - String response = getResponse(urlPath, username, password, - protocolFlag); - - final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - - } - createMRConsumerResponse(response, mrConsumerResponse); - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase( - protocolFlag)) { - final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, - props.getProperty("Protocol")), timeoutMs, - limit); - - String response = getAuthResponse(urlPath, authKey, authDate, - username, password, protocolFlag); - final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - - } - createMRConsumerResponse(response, mrConsumerResponse); - } - - - - } catch (JSONException e) { - mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - mrConsumerResponse.setResponseMessage(e.getMessage()); - } catch (HttpException e) { - mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - mrConsumerResponse.setResponseMessage(e.getMessage()); - }catch(DME2Exception e){ - mrConsumerResponse.setResponseCode(e.getErrorCode()); - mrConsumerResponse.setResponseMessage(e.getErrorMessage()); - }catch (Exception e) { - mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - mrConsumerResponse.setResponseMessage(e.getMessage()); - } - mrConsumerResponse.setActualMessages(msgs); - return mrConsumerResponse; - } - - private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) { - - if(reply.startsWith("{")){ - JSONObject jObject = new JSONObject(reply); - String message = jObject.getString("message"); - int status = jObject.getInt("status"); - - mrConsumerResponse.setResponseCode(Integer.toString(status)); - - if(null != message){ - mrConsumerResponse.setResponseMessage(message); - } - }else if (reply.startsWith("<")){ - mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply)); - mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); - }else{ - mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); - mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE); - } - - } - - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRFormat.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRFormat.java deleted file mode 100644 index 07128ed..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRFormat.java +++ /dev/null @@ -1,49 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl; - -enum MRFormat -{ - /** - * Messages are sent using MR's message format. - */ - CAMBRIA - { - public String toString() { return "application/cambria"; } - }, - - /** - * Messages are sent using MR's message format with compression. - */ - CAMBRIA_ZIP - { - public String toString() { return "application/cambria-zip"; } - }, - - /** - * messages are sent as simple JSON objects. - */ - JSON - { - public String toString() { return "application/json"; } - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRMetaClient.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRMetaClient.java deleted file mode 100644 index 90b50f6..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRMetaClient.java +++ /dev/null @@ -1,260 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.util.Collection; -import java.util.Set; -import java.util.TreeSet; - -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRIdentityManager; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRTopicManager; - -import com.att.nsa.apiClient.credentials.ApiCredential; -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.apiClient.http.HttpObjectNotFoundException; - -public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager -{ - public MRMetaClient ( Collection<String> baseUrls ) throws MalformedURLException - { - super ( baseUrls ); - } - - @Override - public Set<String> getTopics () throws IOException - { - final TreeSet<String> set = new TreeSet<String> (); - try - { - final JSONObject topicSet = get ( "/topics" ); - final JSONArray a = topicSet.getJSONArray ( "topics" ); - for ( int i=0; i<a.length (); i++ ) - { - set.add ( a.getString ( i ) ); - } - } - catch ( HttpObjectNotFoundException e ) - { - getLog().warn ( "No /topics endpoint on service." ); - } - catch ( JSONException e ) - { - getLog().warn ( "Bad /topics result from service." ); - } - catch ( HttpException e ) - { - throw new IOException ( e ); - } - return set; - } - - @Override - public TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException - { - try - { - final JSONObject topicData = get ( "/topics/" + MRConstants.escape ( topic ) ); - return new TopicInfo () - { - @Override - public String getOwner () - { - return topicData.optString ( "owner", null ); - } - - @Override - public String getDescription () - { - return topicData.optString ( "description", null ); - } - - @Override - public Set<String> getAllowedProducers () - { - final JSONObject acl = topicData.optJSONObject ( "writerAcl" ); - if ( acl != null && acl.optBoolean ( "enabled", true ) ) - { - return jsonArrayToSet ( acl.optJSONArray ( "users" ) ); - } - return null; - } - - @Override - public Set<String> getAllowedConsumers () - { - final JSONObject acl = topicData.optJSONObject ( "readerAcl" ); - if ( acl != null && acl.optBoolean ( "enabled", true ) ) - { - return jsonArrayToSet ( acl.optJSONArray ( "users" ) ); - } - return null; - } - }; - } - catch ( JSONException e ) - { - throw new IOException ( e ); - } - catch ( HttpException e ) - { - throw new IOException ( e ); - } - } - - @Override - public void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException - { - final JSONObject o = new JSONObject (); - o.put ( "topicName", topicName ); - o.put ( "topicDescription", topicDescription ); - o.put ( "partitionCount", partitionCount ); - o.put ( "replicationCount", replicationCount ); - post ( "/topics/create", o, false ); - } - - @Override - public void deleteTopic ( String topic ) throws HttpException, IOException - { - delete ( "/topics/" + MRConstants.escape ( topic ) ); - } - - @Override - public boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException - { - return null == getAllowedProducers ( topic ); - } - - @Override - public Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException - { - return getTopicMetadata ( topic ).getAllowedProducers (); - } - - @Override - public void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException - { - put ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() ); - } - - @Override - public void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException - { - delete ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) ); - } - - @Override - public boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException - { - return null == getAllowedConsumers ( topic ); - } - - @Override - public Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException - { - return getTopicMetadata ( topic ).getAllowedConsumers (); - } - - @Override - public void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException - { - put ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() ); - } - - @Override - public void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException - { - delete ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) ); - } - - @Override - public ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException - { - try - { - final JSONObject o = new JSONObject (); - o.put ( "email", email ); - o.put ( "description", description ); - final JSONObject reply = post ( "/apiKeys/create", o, true ); - return new ApiCredential ( reply.getString ( "key" ), reply.getString ( "secret" ) ); - } - catch ( JSONException e ) - { - // the response doesn't meet our expectation - throw new MRApiException ( "The API key response is incomplete.", e ); - } - } - - @Override - public ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException - { - final JSONObject keyEntry = get ( "/apiKeys/" + MRConstants.escape ( apiKey ) ); - if ( keyEntry == null ) - { - return null; - } - - return new ApiKey () - { - @Override - public String getEmail () - { - final JSONObject aux = keyEntry.optJSONObject ( "aux" ); - if ( aux != null ) - { - return aux.optString ( "email" ); - } - return null; - } - - @Override - public String getDescription () - { - final JSONObject aux = keyEntry.optJSONObject ( "aux" ); - if ( aux != null ) - { - return aux.optString ( "description" ); - } - return null; - } - }; - } - - @Override - public void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException - { - final JSONObject o = new JSONObject (); - if ( email != null ) o.put ( "email", email ); - if ( description != null ) o.put ( "description", description ); - patch ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ), o ); - } - - @Override - public void deleteCurrentApiKey () throws HttpException, IOException - { - delete ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ) ); - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java deleted file mode 100644 index 10eef5e..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java +++ /dev/null @@ -1,939 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPOutputStream; - -import javax.ws.rs.core.MultivaluedMap; - -import org.apache.http.HttpException; -import org.apache.http.HttpStatus; -import org.json.JSONArray; -import org.json.JSONObject; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.HostSelector; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants; - -import com.att.aft.dme2.api.DME2Client; -import com.att.aft.dme2.api.DME2Exception; - -public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher -{ - public static class Builder - { - public Builder () - { - } - - public Builder againstUrls ( Collection<String> baseUrls ) - { - fUrls = baseUrls; - return this; - } - - public Builder onTopic ( String topic ) - { - fTopic = topic; - return this; - } - - public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs ) - { - fMaxBatchSize = maxBatchSize; - fMaxBatchAgeMs = maxBatchAgeMs; - return this; - } - - public Builder compress ( boolean compress ) - { - fCompress = compress; - return this; - } - - public Builder httpThreadTime ( int threadOccuranceTime ) - { - this.threadOccuranceTime = threadOccuranceTime; - return this; - } - - public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts ) - { - fAllowSelfSignedCerts = allowSelfSignedCerts; - return this; - } - - public Builder withResponse ( boolean withResponse) - { - fWithResponse = withResponse; - return this; - } - public MRSimplerBatchPublisher build () - { - if(!fWithResponse) - { - try { - return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - } else - { - try { - return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - } - - } - - private Collection<String> fUrls; - private String fTopic; - private int fMaxBatchSize = 100; - private long fMaxBatchAgeMs = 1000; - private boolean fCompress = false; - private int threadOccuranceTime = 50; - private boolean fAllowSelfSignedCerts = false; - private boolean fWithResponse = false; - - }; - - @Override - public int send ( String partition, String msg ) - { - return send ( new message ( partition, msg ) ); - } - @Override - public int send ( String msg ) - { - return send ( new message ( null, msg ) ); - } - - - @Override - public int send ( message msg ) - { - final LinkedList<message> list = new LinkedList<message> (); - list.add ( msg ); - return send ( list ); - } - - - - @Override - public synchronized int send ( Collection<message> msgs ) - { - if ( fClosed ) - { - throw new IllegalStateException ( "The publisher was closed." ); - } - - for ( message userMsg : msgs ) - { - fPending.add ( new TimestampedMessage ( userMsg ) ); - } - return getPendingMessageCount (); - } - - @Override - public synchronized int getPendingMessageCount () - { - return fPending.size (); - } - - @Override - public void close () - { - try - { - final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS ); - if ( remains.size() > 0 ) - { - getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. " - + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." ); - } - } - catch ( InterruptedException e ) - { - getLog().warn ( "Possible message loss. " + e.getMessage(), e ); - } - catch ( IOException e ) - { - getLog().warn ( "Possible message loss. " + e.getMessage(), e ); - } - } - - @Override - public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException - { - synchronized ( this ) - { - fClosed = true; - - // stop the background sender - fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false ); - fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false ); - fExec.shutdown (); - } - - final long now = Clock.now (); - final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit ); - final long timeoutAtMs = now + waitInMs; - - while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 ) - { - send ( true ); - Thread.sleep ( 250 ); - } - - synchronized ( this ) - { - final LinkedList<message> result = new LinkedList<message> (); - fPending.drainTo ( result ); - return result; - } - } - - /** - * Possibly send a batch to the MR server. This is called by the background thread - * and the close() method - * - * @param force - */ - private synchronized void send ( boolean force ) - { - if ( force || shouldSendNow () ) - { - if ( !sendBatch () ) - { - getLog().warn ( "Send failed, " + fPending.size() + " message to send." ); - - // note the time for back-off - fDontSendUntilMs = sfWaitAfterError + Clock.now (); - } - } - } - - private synchronized boolean shouldSendNow () - { - boolean shouldSend = false; - if ( fPending.size () > 0 ) - { - final long nowMs = Clock.now (); - - shouldSend = ( fPending.size() >= fMaxBatchSize ); - if ( !shouldSend ) - { - final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; - shouldSend = sendAtMs <= nowMs; - } - - // however, wait after an error - shouldSend = shouldSend && nowMs >= fDontSendUntilMs; - } - return shouldSend; - } - - private synchronized boolean sendBatch () - { - // it's possible for this call to be made with an empty list. in this case, just return. - if ( fPending.size() < 1 ) - { - return true; - } - - final long nowMs = Clock.now (); - - host = this.fHostSelector.selectBaseHost(); - - final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") ); - - - try - { - /*final String contentType = - fCompress ? - MRFormat.CAMBRIA_ZIP.toString () : - MRFormat.CAMBRIA.toString () - ;*/ - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); - OutputStream os = baseStream; - final String contentType = props.getProperty("contenttype"); - if(contentType.equalsIgnoreCase("application/json")){ - JSONArray jsonArray = new JSONArray(); - for ( TimestampedMessage m : fPending ) - { - JSONObject jsonObject = new JSONObject(m.fMsg); - - jsonArray.put(jsonObject); - - } - os.write (jsonArray.toString().getBytes() ); - os.close(); - - }else if (contentType.equalsIgnoreCase("text/plain")){ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){ - if ( contentType.equalsIgnoreCase("application/cambria-zip") ) - { - os = new GZIPOutputStream ( baseStream ); - } - for ( TimestampedMessage m : fPending ) - { - - os.write ( ( "" + m.fPartition.length () ).getBytes() ); - os.write ( '.' ); - os.write ( ( "" + m.fMsg.length () ).getBytes() ); - os.write ( '.' ); - os.write ( m.fPartition.getBytes() ); - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - }else{ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - - } - os.close (); - } - - - - final long startMs = Clock.now (); - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - - - DME2Configue(); - - Thread.sleep(5); - getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - sender.setPayload(os.toString()); - String dmeResponse = sender.sendAndWait(5000L); - - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" - + dmeResponse.toString(); - getLog().info(logLine); - fPending.clear(); - return true; - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag); - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - if(result.getInt("status") < 200 || result.getInt("status") > 299) { - return false; - } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); - fPending.clear(); - return true; - } - - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); - - - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - if(result.getInt("status") < 200 || result.getInt("status") > 299) { - return false; - } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); - fPending.clear(); - return true; - } - } - catch ( IllegalArgumentException x ) { - getLog().warn ( x.getMessage(), x ); - } catch ( IOException x ) { - getLog().warn ( x.getMessage(), x ); - } catch (HttpException x) { - getLog().warn ( x.getMessage(), x ); - } catch (Exception x) { - getLog().warn(x.getMessage(), x); - } - return false; - } - - public synchronized MRPublisherResponse sendBatchWithResponse () - { - // it's possible for this call to be made with an empty list. in this case, just return. - if ( fPending.size() < 1 ) - { - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - pubResponse.setResponseMessage("No Messages to send"); - return pubResponse; - } - - final long nowMs = Clock.now (); - - host = this.fHostSelector.selectBaseHost(); - - final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") ); - OutputStream os=null; - try - { - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); - os = baseStream; - final String contentType = props.getProperty("contenttype"); - if(contentType.equalsIgnoreCase("application/json")){ - JSONArray jsonArray = new JSONArray(); - for ( TimestampedMessage m : fPending ) - { - JSONObject jsonObject = new JSONObject(m.fMsg); - - jsonArray.put(jsonObject); - - } - os.write (jsonArray.toString().getBytes() ); - }else if (contentType.equalsIgnoreCase("text/plain")){ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){ - if ( contentType.equalsIgnoreCase("application/cambria-zip") ) - { - os = new GZIPOutputStream ( baseStream ); - } - for ( TimestampedMessage m : fPending ) - { - - os.write ( ( "" + m.fPartition.length () ).getBytes() ); - os.write ( '.' ); - os.write ( ( "" + m.fMsg.length () ).getBytes() ); - os.write ( '.' ); - os.write ( m.fPartition.getBytes() ); - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - }else{ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - - } - } - - - - final long startMs = Clock.now (); - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - - - try { - DME2Configue(); - - Thread.sleep(5); - getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - sender.setPayload(os.toString()); - - - String dmeResponse = sender.sendAndWait(5000L); - System.out.println("dmeres->"+dmeResponse); - - - pubResponse = createMRPublisherResponse(dmeResponse,pubResponse); - - if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - - return pubResponse; - } - final String logLine = String.valueOf((Clock.now() - startMs)) - + dmeResponse.toString(); - getLog().info(logLine); - fPending.clear(); - - } - catch (DME2Exception x) { - getLog().warn(x.getMessage(), x); - pubResponse.setResponseCode(x.getErrorCode()); - pubResponse.setResponseMessage(x.getErrorMessage()); - } catch (URISyntaxException x) { - - getLog().warn(x.getMessage(), x); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - pubResponse.setResponseMessage(x.getMessage()); - } catch (Exception x) { - - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - pubResponse.setResponseMessage(x.getMessage()); - - } - - return pubResponse; - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag); - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - - - pubResponse = createMRPublisherResponse(result,pubResponse); - - if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - - return pubResponse; - } - - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); - fPending.clear(); - return pubResponse; - } - - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); - - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - pubResponse = createMRPublisherResponse(result,pubResponse); - - if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - - return pubResponse; - } - - final String logLine = String.valueOf((Clock.now() - startMs)); - getLog().info(logLine); - fPending.clear(); - return pubResponse; - } - } - catch ( IllegalArgumentException x ) { - getLog().warn ( x.getMessage(), x ); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - pubResponse.setResponseMessage(x.getMessage()); - - } catch ( IOException x ) { - getLog().warn ( x.getMessage(), x ); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - pubResponse.setResponseMessage(x.getMessage()); - - } catch (HttpException x) { - getLog().warn ( x.getMessage(), x ); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - pubResponse.setResponseMessage(x.getMessage()); - - } catch (Exception x) { - getLog().warn(x.getMessage(), x); - - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - pubResponse.setResponseMessage(x.getMessage()); - - } - - finally { - if (fPending.size()>0) { - getLog().warn ( "Send failed, " + fPending.size() + " message to send." ); - pubResponse.setPendingMsgs(fPending.size()); - } - if (os != null) { - try { - os.close(); - } catch (Exception x) { - getLog().warn(x.getMessage(), x); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - pubResponse.setResponseMessage("Error in closing Output Stream"); - } - } - } - - return pubResponse; - } - -private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) { - - if (reply.isEmpty()) - { - - mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - mrPubResponse.setResponseMessage("Please verify the Producer properties"); - } - else if(reply.startsWith("{")) - { - JSONObject jObject = new JSONObject(reply); - if(jObject.has("message") && jObject.has("status")) - { - String message = jObject.getString("message"); - if(null != message) - { - mrPubResponse.setResponseMessage(message); - } - mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status"))); - } - else - { - mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); - mrPubResponse.setResponseMessage(reply); - } - } - else if (reply.startsWith("<")) - { - String responseCode = getHTTPErrorResponseCode(reply); - if( responseCode.contains("403")) - { - responseCode = "403"; - } - mrPubResponse.setResponseCode(responseCode); - mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); - } - - return mrPubResponse; - } - - private final String fTopic; - private final int fMaxBatchSize; - private final long fMaxBatchAgeMs; - private final boolean fCompress; - private int threadOccuranceTime; - private boolean fClosed; - private String username; - private String password; - private String host; - - //host selector - private HostSelector fHostSelector = null; - - private final LinkedBlockingQueue<TimestampedMessage> fPending; - private long fDontSendUntilMs; - private final ScheduledThreadPoolExecutor fExec; - - private String latitude; - private String longitude; - private String version; - private String serviceName; - private String env; - private String partner; - private String routeOffer; - private String subContextPath; - private String protocol; - private String methodType; - private String url; - private String dmeuser; - private String dmepassword; - private String contentType; - private static final long sfWaitAfterError = 10000; - private HashMap<String, String> DMETimeOuts; - private DME2Client sender; - public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); - public String producerFilePath; - private String authKey; - private String authDate; - private String handlers; - private Properties props; - public static String routerFilePath; - public static Map<String, String> headers=new HashMap<String, String>(); - public static MultivaluedMap<String, Object> headersMap; - - - private MRPublisherResponse pubResponse; - - public MRPublisherResponse getPubResponse() { - return pubResponse; - } - public void setPubResponse(MRPublisherResponse pubResponse) { - this.pubResponse = pubResponse; - } - - public static String getRouterFilePath() { - return routerFilePath; - } - - public static void setRouterFilePath(String routerFilePath) { - MRSimplerBatchPublisher.routerFilePath = routerFilePath; - } - - public Properties getProps() { - return props; - } - - public void setProps(Properties props) { - this.props = props; - } - - public String getProducerFilePath() { - return producerFilePath; - } - - public void setProducerFilePath(String producerFilePath) { - this.producerFilePath = producerFilePath; - } - - public String getProtocolFlag() { - return protocolFlag; - } - - public void setProtocolFlag(String protocolFlag) { - this.protocolFlag = protocolFlag; - } - - - private void DME2Configue() throws Exception { - try { - - /* FileReader reader = new FileReader(new File (producerFilePath)); - Properties props = new Properties(); - props.load(reader);*/ - latitude = props.getProperty("Latitude"); - longitude = props.getProperty("Longitude"); - version = props.getProperty("Version"); - serviceName = props.getProperty("ServiceName"); - env = props.getProperty("Environment"); - partner = props.getProperty("Partner"); - routeOffer = props.getProperty("routeOffer"); - subContextPath = props.getProperty("SubContextPath")+fTopic; - /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){ - subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition"); - }*/ - protocol = props.getProperty("Protocol"); - methodType = props.getProperty("MethodType"); - dmeuser = props.getProperty("username"); - dmepassword = props.getProperty("password"); - contentType = props.getProperty("contenttype"); - handlers = props.getProperty("sessionstickinessrequired"); - routerFilePath= props.getProperty("DME2preferredRouterFilePath"); - - /** - * Changes to DME2Client url to use Partner for auto failover between data centers - * When Partner value is not provided use the routeOffer value for auto failover within a cluster - */ - - - String partitionKey = props.getProperty("partition"); - - if (partner != null && !partner.isEmpty() ) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; - if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){ - url = url + "&partitionKey=" + partitionKey; - } - } - else if (routeOffer!=null && !routeOffer.isEmpty()) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer; - if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){ - url = url + "&partitionKey=" + partitionKey; - } - } - - DMETimeOuts = new HashMap<String, String>(); - DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); - DMETimeOuts.put("Content-Type", contentType); - System.setProperty("AFT_LATITUDE", latitude); - System.setProperty("AFT_LONGITUDE", longitude); - System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT")); - //System.setProperty("DME2.DEBUG", "true"); - // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true"); - //System.out.println("XXXXXX"+url); - - //SSL changes - System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", - "SSLv3,TLSv1,TLSv1.1"); - System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); - System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); - - //SSL changes - - sender = new DME2Client(new URI(url), 5000L); - - sender.setAllowAllHttpReturnCodes(true); - sender.setMethod(methodType); - sender.setSubContext(subContextPath); - sender.setCredentials(dmeuser, dmepassword); - sender.setHeaders(DMETimeOuts); - if(handlers.equalsIgnoreCase("yes")){ - sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); - sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); - }else{ - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client.HeaderReplyHandler"); - } - } catch (DME2Exception x) { - getLog().warn(x.getMessage(), x); - throw new DME2Exception(x.getErrorCode(),x.getErrorMessage()); - } catch (URISyntaxException x) { - - getLog().warn(x.getMessage(), x); - throw new URISyntaxException(url,x.getMessage()); - } catch (Exception x) { - - getLog().warn(x.getMessage(), x); - throw new Exception(x.getMessage()); - } - } - - private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException - { - super ( hosts ); - - if ( topic == null || topic.length() < 1 ) - { - throw new IllegalArgumentException ( "A topic must be provided." ); - } - - fHostSelector = new HostSelector(hosts, null); - fClosed = false; - fTopic = topic; - fMaxBatchSize = maxBatchSize; - fMaxBatchAgeMs = maxBatchAgeMs; - fCompress = compress; - - fPending = new LinkedBlockingQueue<TimestampedMessage> (); - fDontSendUntilMs = 0; - fExec = new ScheduledThreadPoolExecutor ( 1 ); - pubResponse = new MRPublisherResponse(); - - } - - private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException - { - super ( hosts ); - - if ( topic == null || topic.length() < 1 ) - { - throw new IllegalArgumentException ( "A topic must be provided." ); - } - - fHostSelector = new HostSelector(hosts, null); - fClosed = false; - fTopic = topic; - fMaxBatchSize = maxBatchSize; - fMaxBatchAgeMs = maxBatchAgeMs; - fCompress = compress; - threadOccuranceTime=httpThreadOccurnace; - fPending = new LinkedBlockingQueue<TimestampedMessage> (); - fDontSendUntilMs = 0; - fExec = new ScheduledThreadPoolExecutor ( 1 ); - fExec.scheduleAtFixedRate ( new Runnable() - { - @Override - public void run () - { - send ( false ); - } - }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS ); - } - - private static class TimestampedMessage extends message - { - public TimestampedMessage ( message m ) - { - super ( m ); - timestamp = Clock.now(); - } - public final long timestamp; - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public String getContentType() { - return contentType; - } - - public void setContentType(String contentType) { - this.contentType = contentType; - } - - public String getAuthKey() { - return authKey; - } - - public void setAuthKey(String authKey) { - this.authKey = authKey; - } - - public String getAuthDate() { - return authDate; - } - - public void setAuthDate(String authDate) { - this.authDate = authDate; - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRConsumerResponse.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRConsumerResponse.java deleted file mode 100644 index 0006852..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRConsumerResponse.java +++ /dev/null @@ -1,60 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response; - -public class MRConsumerResponse { - - private String responseCode; - - private String responseMessage; - - private Iterable<String> actualMessages; - - - - - public String getResponseCode() { - return responseCode; - } - - public void setResponseCode(String responseCode) { - this.responseCode = responseCode; - } - - public String getResponseMessage() { - return responseMessage; - } - - public void setResponseMessage(String responseMessage) { - this.responseMessage = responseMessage; - } - - public Iterable<String> getActualMessages() { - return actualMessages; - } - - public void setActualMessages(Iterable<String> actualMessages) { - this.actualMessages = actualMessages; - } - - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRPublisherResponse.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRPublisherResponse.java deleted file mode 100644 index fd53de5..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRPublisherResponse.java +++ /dev/null @@ -1,66 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response; - -/** - * Response for Publisher - * @author author - * - */ -public class MRPublisherResponse { - private String responseCode; - - private String responseMessage; - - private int pendingMsgs; - - public String getResponseCode() { - return responseCode; - } - - public void setResponseCode(String responseCode) { - this.responseCode = responseCode; - } - - public String getResponseMessage() { - return responseMessage; - } - - public void setResponseMessage(String responseMessage) { - this.responseMessage = responseMessage; - } - - public int getPendingMsgs() { - return pendingMsgs; - } - - public void setPendingMsgs(int pendingMsgs) { - this.pendingMsgs = pendingMsgs; - } - - public String toString() { - return "Response Code:" + this.responseCode + "," - + "Response Message:" + this.responseMessage + "," + "Pending Messages Count" - + this.pendingMsgs; - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java deleted file mode 100644 index a97793b..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client; - -//package com.att.aft.dme2.api; - -import java.util.logging.Level; -import java.util.logging.Logger; - -//import com.att.aft.dme2.api.DME2FailoverFaultHandler; -//import com.att.aft.dme2.api.util.DME2Constants; -//import com.att.aft.dme2.api.util.DME2ExchangeFaultContext; -//import com.att.aft.dme2.api.util.LogMessage; -//import com.att.aft.dme2.api.util.LogUtil; -public class DefaultLoggingFailoverFaultHandler /*implements DME2FailoverFaultHandler*/ { - /** The logger. */ - //private static Logger logger = DME2Constants.getLogger(DefaultLoggingFailoverFaultHandler.class.getName()); - -// @Override -// public void handleEndpointFailover(/*DME2ExchangeFaultContext context*/) { -// // LogUtil.INSTANCE.report(logger, Level.WARNING, LogMessage.SEP_FAILOVER, context.getService(),context.getRequestURL(),context.getRouteOffer(),context.getResponseCode(),context.getException()); -// } -// @Override -// /** -// * The DME2Exchange already has a log message when the route offer is failed over. We dont need to log it again here. -// */ -// public void handleRouteOfferFailover(DME2ExchangeFaultContext context) { -// //noop -// -// } -}
\ No newline at end of file diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/HeaderReplyHandler.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/HeaderReplyHandler.java deleted file mode 100644 index f5c133a..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/HeaderReplyHandler.java +++ /dev/null @@ -1,63 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client; - - -import java.util.Map; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRSimplerBatchPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.aft.dme2.api.util.DME2ExchangeFaultContext; -import com.att.aft.dme2.api.util.DME2ExchangeReplyHandler; -import com.att.aft.dme2.api.util.DME2ExchangeResponseContext; - - - -//public class HeaderReplyHandler implements DME2ReplyHandler { - - public class HeaderReplyHandler implements DME2ExchangeReplyHandler { - - private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () ); - - - @Override public void handleFault(DME2ExchangeFaultContext responseData) { - // TODO Auto-generated method stub - //StaticCache.getInstance().setHandleFaultInvoked(true); - } - @Override public void handleEndpointFault(DME2ExchangeFaultContext responseData) { - // TODO Auto-generated method stub - //StaticCache.getInstance().setHandleEndpointFaultInvoked(true); - } -@Override public void handleReply(DME2ExchangeResponseContext responseData) { - - if(responseData != null) { - MRClientFactory.DME2HeadersMap=responseData.getResponseHeaders(); - if (responseData.getResponseHeaders().get("transactionId")!=null) - fLog.info("Transaction Id : " + responseData.getResponseHeaders().get("transactionId")); - - } -} - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteReplyHandler.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteReplyHandler.java deleted file mode 100644 index bf57836..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteReplyHandler.java +++ /dev/null @@ -1,74 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client; -import java.io.File; -import java.io.FileWriter; -import java.io.InputStream; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRSimplerBatchPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.aft.dme2.api.util.DME2ExchangeFaultContext; -import com.att.aft.dme2.api.util.DME2ExchangeReplyHandler; -import com.att.aft.dme2.api.util.DME2ExchangeResponseContext; - -public class PreferredRouteReplyHandler implements DME2ExchangeReplyHandler { - private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () ); - @Override public void handleReply(DME2ExchangeResponseContext responseData) { - - if(responseData != null) { - MRClientFactory.DME2HeadersMap=responseData.getResponseHeaders(); - if (responseData.getResponseHeaders().get("transactionId")!=null) - - fLog.info("Transaction_Id : " + responseData.getResponseHeaders().get("transactionId")); - - if(responseData.getRouteOffer() != null ){ - routeWriter("preferredRouteKey",responseData.getRouteOffer()); - - } - } -} - - @Override public void handleFault(DME2ExchangeFaultContext responseData) { - // TODO Auto-generated method stub - //StaticCache.getInstance().setHandleFaultInvoked(true); - } - @Override public void handleEndpointFault(DME2ExchangeFaultContext responseData) { - // TODO Auto-generated method stub - //StaticCache.getInstance().setHandleEndpointFaultInvoked(true); - } - public void routeWriter(String routeKey, String routeValue){ - - try{ - - FileWriter routeWriter=new FileWriter(new File (MRSimplerBatchPublisher.routerFilePath)); - routeWriter.write(routeKey+"="+routeValue); - routeWriter.close(); - - }catch(Exception ex){ - fLog.error("Reply Router Error " + ex.toString() ); - } - - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteRequestHandler.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteRequestHandler.java deleted file mode 100644 index 14ab313..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteRequestHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.aft.dme2.api.util.DME2ExchangeRequestContext; -import com.att.aft.dme2.api.util.DME2ExchangeRequestHandler; - -public class PreferredRouteRequestHandler implements DME2ExchangeRequestHandler { - private Logger fLog = LoggerFactory.getLogger(this.getClass().getName()); - - @Override - public void handleRequest(DME2ExchangeRequestContext requestData) { - - if (requestData != null) { - - requestData.setPreferredRouteOffer(readRoute("preferredRouteKey")); - } - } - - public String readRoute(String routeKey) { - - try { - - MRClientFactory.prop.load(MRClientFactory.routeReader); - - } catch (Exception ex) { - fLog.error("Request Router Error " + ex.toString()); - } - return MRClientFactory.prop.getProperty(routeKey); - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExampleConsumer.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExampleConsumer.java deleted file mode 100644 index fa1075c..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExampleConsumer.java +++ /dev/null @@ -1,77 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client; - -import java.util.Map; - -import javax.ws.rs.core.MultivaluedMap; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer; - -public class SimpleExampleConsumer { - - public static void main(String[] args) { - - long count = 0; - long nextReport = 5000; - - final long startMs = System.currentTimeMillis(); - - try { - - final MRConsumer cc = MRClientFactory.createConsumer("D:\\SG\\consumer.properties"); - while (true) { - for (String msg : cc.fetch()) { - - System.out.println("Message Received: " + msg); - } - // Header for DME2 Call. - MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap; - for (String key : headersMap.keySet()) { - System.out.println("Header Key " + key); - System.out.println("Header Value " + headersMap.get(key)); - } - // Header for HTTP Call. - - Map<String, String> - dme2headersMap=MRClientFactory.DME2HeadersMap; for(String key - : dme2headersMap.keySet()) { System.out.println("Header Key " - + key); System.out.println("Header Value " + - dme2headersMap.get(key)); } - - if (count > nextReport) { - nextReport += 5000; - - final long endMs = System.currentTimeMillis(); - final long elapsedMs = endMs - startMs; - final double elapsedSec = elapsedMs / 1000.0; - final double eps = count / elapsedSec; - System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps"); - } - } - } catch (Exception x) { - System.err.println(x.getClass().getName() + ": " + x.getMessage()); - } - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExamplePublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExamplePublisher.java deleted file mode 100644 index c4b006f..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExamplePublisher.java +++ /dev/null @@ -1,134 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client; - - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import java.util.concurrent.TimeUnit; - -import javax.ws.rs.core.MultivaluedMap; - -import org.json.JSONObject; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message; - -/** - * An example of how to use the Java publisher. - * - * @author author - */ -public class SimpleExamplePublisher { - static String content = null; - static String messageSize = null; - static String transport = null; - static String messageCount = null; - - public void publishMessage(String producerFilePath) throws IOException, InterruptedException, Exception { - - // create our publisher - - // publish some messages - - - StringBuilder sb = new StringBuilder(); - final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher(producerFilePath); - - if (content.equalsIgnoreCase("text/plain")) { - for (int i = 0; i < Integer.parseInt(messageCount); i++) { - for (int j = 0; j < Integer.parseInt(messageSize); j++) { - sb.append("T"); - } - - pub.send(sb.toString()); - } - } else if (content.equalsIgnoreCase("application/cambria")) { - for (int i = 0; i < Integer.parseInt(messageCount); i++) { - for (int j = 0; j < Integer.parseInt(messageSize); j++) { - sb.append("C"); - } - - pub.send("Key", sb.toString()); - } - } else if (content.equalsIgnoreCase("application/json")) { - for (int i = 0; i < Integer.parseInt(messageCount); i++) { - - final JSONObject msg12 = new JSONObject(); - msg12.put("Name", "DMaaP Reference Client to Test jason Message"); - - pub.send(msg12.toString()); - - } - } - - // ... - - // close the publisher to make sure everything's sent before exiting. - // The batching - // publisher interface allows the app to get the set of unsent messages. - // It could - // write them to disk, for example, to try to send them later. - /* final List<message> stuck = pub.close(20, TimeUnit.SECONDS); - if (stuck.size() > 0) { - System.err.println(stuck.size() + " messages unsent"); - } else { - System.out.println("Clean exit; all messages sent."); - }*/ - - if (transport.equalsIgnoreCase("HTTP")) { - MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap; - for (String key : headersMap.keySet()) { - System.out.println("Header Key " + key); - System.out.println("Header Value " + headersMap.get(key)); - } - } else { - Map<String, String> dme2headersMap = MRClientFactory.DME2HeadersMap; - for (String key : dme2headersMap.keySet()) { - System.out.println("Header Key " + key); - System.out.println("Header Value " + dme2headersMap.get(key)); - } - } - - } - - public static void main(String[] args) throws InterruptedException, Exception { - - String producerFilePath = args[0]; - content = args[1]; - messageSize = args[2]; - transport = args[3]; - messageCount = args[4]; - /*String producerFilePath = null; - content = null; - messageSize =null; - transport =null; - messageCount = null;*/ - SimpleExamplePublisher publisher = new SimpleExamplePublisher(); - - publisher.publishMessage("D:\\SG\\producer.properties"); - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/logging/MRAppender.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/logging/MRAppender.java deleted file mode 100644 index 96389f5..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/logging/MRAppender.java +++ /dev/null @@ -1,159 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -/** - * - */ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.logging; - -import java.io.IOException; - -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.helpers.LogLog; -import org.apache.log4j.spi.LoggingEvent; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher; - -/** - * @author author - * - */ -public class MRAppender extends AppenderSkeleton { - - private MRPublisher fPublisher; - - //Provided through log4j configuration - private String topic; - private String partition; - private String hosts; - private int maxBatchSize = 1; - private int maxAgeMs = 1000; - private boolean compress = false; - - /** - * - */ - public MRAppender() { - super(); - } - - /** - * @param isActive - */ - public MRAppender(boolean isActive) { - super(isActive); - } - - /* (non-Javadoc) - * @see org.apache.log4j.Appender#close() - */ - @Override - public void close() { - if (!this.closed) { - this.closed = true; - fPublisher.close(); - } - } - - /* (non-Javadoc) - * @see org.apache.log4j.Appender#requiresLayout() - */ - @Override - public boolean requiresLayout() { - return false; - } - - /* (non-Javadoc) - * @see org.apache.log4j.AppenderSkeleton#append(org.apache.log4j.spi.LoggingEvent) - */ - @Override - protected void append(LoggingEvent event) { - final String message; - - if (this.layout == null) { - message = event.getRenderedMessage(); - } else { - message = this.layout.format(event); - } - - try { - fPublisher.send(partition, message); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public void activateOptions() { - if (hosts != null && topic != null && partition != null) { - fPublisher = MRClientFactory.createBatchingPublisher(hosts.split(","), topic, maxBatchSize, maxAgeMs, compress); - } else { - LogLog.error("The Hosts, Topic, and Partition parameter are required to create a MR Log4J Appender"); - } - } - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public String getPartition() { - return partition; - } - - public void setPartition(String partition) { - this.partition = partition; - } - - public String getHosts() { - return hosts; - } - - public void setHosts(String hosts) { - this.hosts = hosts; - } - - public int getMaxBatchSize() { - return maxBatchSize; - } - - public void setMaxBatchSize(int maxBatchSize) { - this.maxBatchSize = maxBatchSize; - } - - public int getMaxAgeMs() { - return maxAgeMs; - } - - public void setMaxAgeMs(int maxAgeMs) { - this.maxAgeMs = maxAgeMs; - } - - public boolean isCompress() { - return compress; - } - - public void setCompress(boolean compress) { - this.compress = compress; - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ConsolePublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ConsolePublisher.java deleted file mode 100644 index 20eacd8..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ConsolePublisher.java +++ /dev/null @@ -1,87 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message; - -/** - * A simple publisher that reads from std in, sending each line as a message. - * @author author - */ -public class ConsolePublisher -{ - public static void main ( String[] args ) throws IOException //throws IOException, InterruptedException - { - // read the hosts(s) from the command line - final String hosts = ( args.length > 0 ? args[0] : "aaa.it.att.com,bbb.it.att.com,ccc.it.att.com" ); - - // read the topic name from the command line - final String topic = ( args.length > 1 ? args[1] : "TEST-TOPIC" ); - - // read the topic name from the command line - final String partition = ( args.length > 2 ? args[2] : UUID.randomUUID ().toString () ); - - // set up some batch limits and the compression flag - final int maxBatchSize = 100; - final long maxAgeMs = 250; - final boolean withGzip = false; - - // create our publisher - final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, withGzip ); - - final BufferedReader cin = new BufferedReader ( new InputStreamReader ( System.in ) ); - try - { - String line = null; - while ( ( line = cin.readLine () ) != null ) - { - pub.send ( partition, line ); - } - } - finally - { - List<message> leftovers = null; - try - { - leftovers = pub.close ( 10, TimeUnit.SECONDS ); - } - catch ( InterruptedException e ) - { - System.err.println ( "Send on close interrupted." ); - } - for ( message m : leftovers ) - { - System.err.println ( "Unsent message: " + m.fMsg ); - } - } - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ProtocolTypeConstants.java deleted file mode 100644 index c60657b..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ProtocolTypeConstants.java +++ /dev/null @@ -1,46 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -/** - * - */ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients; - -/** - * @author author - * - */ -public enum ProtocolTypeConstants { - - DME2("DME2"), - AAF_AUTH("HTTPAAF"), - AUTH_KEY("HTTPAUTH"); - - private String value; - - private ProtocolTypeConstants(String value) { - this.value = value; - } - - public String getValue() { - return value; - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SampleConsumer.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SampleConsumer.java deleted file mode 100644 index 79bf2ed..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SampleConsumer.java +++ /dev/null @@ -1,86 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients; - -import java.io.IOException; -import java.util.LinkedList; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SampleConsumer { - public static void main ( String[] args ) - { - final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class); - - - LOG.info("Sample Consumer Class executing"); - final String topic = "org.onap.dmaap.messagerouter.dmaapclient.app.dmaap.mr.testingTopic"; - final String url = ( args.length > 1 ? args[1] : "localhost:8181" ); - final String group = ( args.length > 2 ? args[2] :"grp" ); - /*final String id = ( args.length > 3 ? args[3] : "0" );*/ - final String id = ( args.length > 3 ? args[3] : "1" ); - - long count = 0; - long nextReport = 5000; - - final long startMs = System.currentTimeMillis (); - - final LinkedList<String> urlList = new LinkedList<String> (); - for ( String u : url.split ( "," ) ) - { - urlList.add ( u ); - } - - final MRConsumer cc = MRClientFactory.createConsumer ( urlList, topic, group, id, 10*1000, 1000, null, "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ); - try - { - while ( true ) - { - for ( String msg : cc.fetch () ) - { - //System.out.println ( "" + (++count) + ": " + msg ); - LOG.info ( "" + (++count) + ": " + msg ); - } - - if ( count > nextReport ) - { - nextReport += 5000; - - final long endMs = System.currentTimeMillis (); - final long elapsedMs = endMs - startMs; - final double elapsedSec = elapsedMs / 1000.0; - final double eps = count / elapsedSec; - //System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); - LOG.info ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); - } - LOG.info ( "" + (++count) + ": consumed message" ); - } - } - catch ( Exception x ) - { - System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); - } - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SamplePublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SamplePublisher.java deleted file mode 100644 index 5ed3a2b..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SamplePublisher.java +++ /dev/null @@ -1,85 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.json.JSONObject; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientBuilders.PublisherBuilder; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SamplePublisher { - public static void main ( String[] args ) throws IOException, InterruptedException - { - final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class); - // read the hosts(s) from the command line - final String hosts = ( args.length > 0 ? args[0] : "localhost:8181" ); - - // read the topic name from the command line - //final String topic = ( args.length > 1 ? args[1] : "MY-EXAMPLE-TOPIC" ); - final String topic = ( args.length > 1 ? args[1] : "org.onap.dmaap.messagerouter.dmaapclient.app.dmaap.mr.testingTopic" ); - - // set up some batch limits and the compression flag - final int maxBatchSize = 100; - final int maxAgeMs = 250; - final boolean withGzip = false; - - // create our publisher - - final MRBatchingPublisher pub = new PublisherBuilder (). - usingHosts ( hosts ). - onTopic ( topic ).limitBatch(maxBatchSize, maxAgeMs). - authenticatedBy ( "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ). - build () - ; - // publish some messages - final JSONObject msg1 = new JSONObject (); - msg1.put ( "name", "tttttttttttttttt" ); - msg1.put ( "greeting", "ooooooooooooooooo" ); - pub.send ( "MyPartitionKey", msg1.toString () ); - - final JSONObject msg2 = new JSONObject (); - msg2.put ( "now", System.currentTimeMillis () ); - pub.send ( "MyOtherPartitionKey", msg2.toString () ); - - // ... - - // close the publisher to make sure everything's sent before exiting. The batching - // publisher interface allows the app to get the set of unsent messages. It could - // write them to disk, for example, to try to send them later. - final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS ); - if ( stuck.size () > 0 ) - { - LOG.warn ( stuck.size() + " messages unsent" ); - } - else - { - LOG.info ( "Clean exit; all messages sent." ); - } - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumer.java deleted file mode 100644 index fe43802..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumer.java +++ /dev/null @@ -1,86 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients; - - - -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.util.Properties; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer; - -public class SimpleExampleConsumer -{ - - static FileWriter routeWriter= null; - static Properties props=null; - static FileReader routeReader=null; - public static void main ( String[] args ) - { - - long count = 0; - long nextReport = 5000; - - final long startMs = System.currentTimeMillis (); - - try - { - String routeFilePath="/src/main/resources/dme2/preferredRoute.txt"; - - - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File (routeFilePath)); - } - routeReader= new FileReader(new File (routeFilePath)); - props= new Properties(); - final MRConsumer cc = MRClientFactory.createConsumer ( "/src/main/resources/dme2/consumer.properties" ); - while ( true ) - { - for ( String msg : cc.fetch () ) - { - //System.out.println ( "" + (++count) + ": " + msg ); - System.out.println(msg); - } - - if ( count > nextReport ) - { - nextReport += 5000; - - final long endMs = System.currentTimeMillis (); - final long elapsedMs = endMs - startMs; - final double elapsedSec = elapsedMs / 1000.0; - final double eps = count / elapsedSec; - System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); - } - } - } - catch ( Exception x ) - { - System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); - } - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java deleted file mode 100644 index 836fb90..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java +++ /dev/null @@ -1,90 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients; - -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.util.Properties; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRConsumerResponse; - -public class SimpleExampleConsumerWithReturnResponse { - - - static FileWriter routeWriter= null; - static Properties props=null; - static FileReader routeReader=null; - public static void main ( String[] args ) - { - - long count = 0; - long nextReport = 5000; - - final long startMs = System.currentTimeMillis (); - - try - { - String routeFilePath="src/main/resources/dme2/preferredRoute.txt"; - - - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File (routeFilePath)); - } - routeReader= new FileReader(new File (routeFilePath)); - props= new Properties(); - final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" ); - while ( true ) - { - MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse(); - System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode()); - - System.out.println("mrConsumerResponse Message :"+mrConsumerResponse.getResponseMessage()); - - System.out.println("mrConsumerResponse ActualMessage :"+mrConsumerResponse.getActualMessages()); - /*for ( String msg : mrConsumerResponse.getActualMessages() ) - { - //System.out.println ( "" + (++count) + ": " + msg ); - System.out.println(msg); - }*/ - if ( count > nextReport ) - { - nextReport += 5000; - - final long endMs = System.currentTimeMillis (); - final long elapsedMs = endMs - startMs; - final double elapsedSec = elapsedMs / 1000.0; - final double eps = count / elapsedSec; - System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); - } - } - } - catch ( Exception x ) - { - System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); - } - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisher.java deleted file mode 100644 index 8579e68..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisher.java +++ /dev/null @@ -1,97 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients; - -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import org.json.JSONObject; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message; - -/** - * An example of how to use the Java publisher. - * @author author - */ -public class SimpleExamplePublisher -{ - static FileWriter routeWriter= null; - static Properties props=null; - static FileReader routeReader=null; - public void publishMessage ( String producerFilePath ) throws IOException, InterruptedException, Exception - { - - // create our publisher - final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath); - // publish some messages - final JSONObject msg1 = new JSONObject (); - msg1.put ( "Name", "Sprint" ); - //msg1.put ( "greeting", "Hello .." ); - pub.send ( "First cambria messge" ); - pub.send ( "MyPartitionKey", msg1.toString () ); - - final JSONObject msg2 = new JSONObject (); - //msg2.put ( "mrclient1", System.currentTimeMillis () ); - - - // ... - - // close the publisher to make sure everything's sent before exiting. The batching - // publisher interface allows the app to get the set of unsent messages. It could - // write them to disk, for example, to try to send them later. - final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS ); - if ( stuck.size () > 0 ) - { - System.err.println ( stuck.size() + " messages unsent" ); - } - else - { - System.out.println ( "Clean exit; all messages sent." ); - } - } - - public static void main(String []args) throws InterruptedException, Exception{ - - String routeFilePath="/src/main/resources/dme2/preferredRoute.txt"; - - SimpleExamplePublisher publisher = new SimpleExamplePublisher(); - - - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File (routeFilePath)); - } - routeReader= new FileReader(new File (routeFilePath)); - props= new Properties(); - publisher.publishMessage("/src/main/resources/dme2/producer.properties"); - } - -} - diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java deleted file mode 100644 index 74b1462..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java +++ /dev/null @@ -1,84 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.json.JSONObject; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse; - /** - *An example of how to use the Java publisher. - * @author author - * - */ - public class SimpleExamplePublisherWithResponse - { - static FileWriter routeWriter= null; - static Properties props=null; - static FileReader routeReader=null; - - public static void main(String []args) throws InterruptedException, Exception{ - - String routeFilePath="src/main/resources/dme2/preferredRoute.txt"; - String msgCount = args[0]; - SimpleExamplePublisherWithResponse publisher = new SimpleExamplePublisherWithResponse(); - File fo= new File(routeFilePath); - if(!fo.exists()){ - routeWriter=new FileWriter(new File (routeFilePath)); - } - routeReader= new FileReader(new File (routeFilePath)); - props= new Properties(); - int i=0; - while (i< Integer.valueOf(msgCount)) - { - publisher.publishMessage("src/main/resources/dme2/producer.properties",Integer.valueOf(msgCount)); - i++; - } - } - - public void publishMessage ( String producerFilePath , int count ) throws IOException, InterruptedException, Exception - { - // create our publisher - final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath,true); - // publish some messages - final JSONObject msg1 = new JSONObject (); - - msg1.put ( "Partition:1", "Message:"+count); - msg1.put ( "greeting", "Hello .." ); - - - pub.send ( "1", msg1.toString()); - pub.send ( "1", msg1.toString()); - - MRPublisherResponse res= pub.sendBatchWithResponse(); - - System.out.println("Pub response->"+res.toString()); - } - - - } diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRBatchingPublisherMock.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRBatchingPublisherMock.java deleted file mode 100644 index eca5874..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRBatchingPublisherMock.java +++ /dev/null @@ -1,183 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.support; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse; -import org.slf4j.Logger; - -/** - * A helper for unit testing systems that use a MRPublisher. When setting - * up your test, inject an instance into MRClientFactory to have it return - * the mock client. - * - * @author author - * - */ -public class MRBatchingPublisherMock implements MRBatchingPublisher -{ - public class Entry - { - public Entry ( String partition, String msg ) - { - fPartition = partition; - fMessage = msg; - } - - @Override - public String toString () - { - return fMessage; - } - - public final String fPartition; - public final String fMessage; - } - - public MRBatchingPublisherMock () - { - fCaptures = new LinkedList<Entry> (); - } - - public interface Listener - { - void onMessage ( Entry e ); - } - public void addListener ( Listener listener ) - { - fListeners.add ( listener ); - } - - public List<Entry> getCaptures () - { - return getCaptures ( new MessageFilter () { @Override public boolean match ( String msg ) { return true; } } ); - } - - public interface MessageFilter - { - boolean match ( String msg ); - } - - public List<Entry> getCaptures ( MessageFilter filter ) - { - final LinkedList<Entry> result = new LinkedList<Entry> (); - for ( Entry capture : fCaptures ) - { - if ( filter.match ( capture.fMessage ) ) - { - result.add ( capture ); - } - } - return result; - } - - public int received () - { - return fCaptures.size(); - } - - public void reset () - { - fCaptures.clear (); - } - - @Override - public int send ( String partition, String msg ) - { - final Entry e = new Entry ( partition, msg ); - - fCaptures.add ( e ); - for ( Listener l : fListeners ) - { - l.onMessage ( e ); - } - return 1; - } - - @Override - public int send ( message msg ) - { - return send ( msg.fPartition, msg.fMsg ); - } - @Override - public int send ( String msg ) - { - return 1; - - } - - @Override - public int send ( Collection<message> msgs ) - { - int sum = 0; - for ( message m : msgs ) - { - sum += send ( m ); - } - return sum; - } - - @Override - public int getPendingMessageCount () - { - return 0; - } - - @Override - public List<message> close ( long timeout, TimeUnit timeoutUnits ) - { - return new LinkedList<message> (); - } - - @Override - public void close () - { - } - - @Override - public void setApiCredentials ( String apiKey, String apiSecret ) - { - } - - @Override - public void clearApiCredentials () - { - } - - @Override - public void logTo ( Logger log ) - { - } - - private final LinkedList<Entry> fCaptures; - private LinkedList<Listener> fListeners = new LinkedList<Listener> (); - @Override - public MRPublisherResponse sendBatchWithResponse() { - // TODO Auto-generated method stub - return null; - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRConsumerMock.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRConsumerMock.java deleted file mode 100644 index 9728053..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRConsumerMock.java +++ /dev/null @@ -1,167 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.support; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRConsumerResponse; -import org.slf4j.Logger; - -/** - * A helper for unit testing systems that use a MRConsumer. When setting - * up your test, inject an instance into MRClientFactory to have it return - * the mock client. - * - * @author author - * - */ -public class MRConsumerMock implements MRConsumer -{ - public class Entry - { - public Entry ( long waitMs, int statusCode, List<String> msgs ) - { - fWaitMs = waitMs; - fStatusCode = statusCode; - fStatusMsg = null; - fMsgs = new LinkedList<String> ( msgs ); - } - - public Entry ( long waitMs, int statusCode, String statusMsg ) - { - fWaitMs = waitMs; - fStatusCode = statusCode; - fStatusMsg = statusMsg; - fMsgs = null; - } - - public LinkedList<String> run () throws IOException - { - try - { - Thread.sleep ( fWaitMs ); - if ( fStatusCode >= 200 && fStatusCode <= 299 ) - { - return fMsgs; - } - throw new IOException ( "" + fStatusCode + " " + fStatusMsg ); - } - catch ( InterruptedException e ) - { - throw new IOException ( e ); - } - } - - private final long fWaitMs; - private final int fStatusCode; - private final String fStatusMsg; - private final LinkedList<String> fMsgs; - } - - public MRConsumerMock () - { - fReplies = new LinkedList<Entry> (); - } - - @Override - public void close () - { - } - - @Override - public void setApiCredentials ( String apiKey, String apiSecret ) - { - } - - @Override - public void clearApiCredentials () - { - } - - public synchronized void add ( Entry e ) - { - fReplies.add ( e ); - } - - public void addImmediateMsg ( String msg ) - { - addDelayedMsg ( 0, msg ); - } - - public void addDelayedMsg ( long delay, String msg ) - { - final LinkedList<String> list = new LinkedList<String> (); - list.add ( msg ); - add ( new Entry ( delay, 200, list ) ); - } - - public void addImmediateMsgGroup ( List<String> msgs ) - { - addDelayedMsgGroup ( 0, msgs ); - } - - public void addDelayedMsgGroup ( long delay, List<String> msgs ) - { - final LinkedList<String> list = new LinkedList<String> ( msgs ); - add ( new Entry ( delay, 200, list ) ); - } - - public void addImmediateError ( int statusCode, String statusText ) - { - add ( new Entry ( 0, statusCode, statusText ) ); - } - - @Override - public Iterable<String> fetch () throws IOException - { - return fetch ( -1, -1 ); - } - - @Override - public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException - { - return fReplies.size () > 0 ? fReplies.removeFirst ().run() : new LinkedList<String>(); - } - - @Override - public void logTo ( Logger log ) - { - } - - private final LinkedList<Entry> fReplies; - - @Override - public MRConsumerResponse fetchWithReturnConsumerResponse() { - // TODO Auto-generated method stub - return null; - } - - @Override - public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, - int limit) { - // TODO Auto-generated method stub - return null; - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ApiKeyCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ApiKeyCommand.java deleted file mode 100644 index f15c3ae..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ApiKeyCommand.java +++ /dev/null @@ -1,135 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools; - -import java.io.IOException; -import java.io.PrintStream; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRIdentityManager; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClient.MRApiException; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRIdentityManager.ApiKey; - -import com.att.nsa.apiClient.credentials.ApiCredential; -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.apiClient.http.HttpObjectNotFoundException; -import com.att.nsa.cmdtool.Command; -import com.att.nsa.cmdtool.CommandNotReadyException; - -public class ApiKeyCommand implements Command<MRCommandContext> -{ - - @Override - public String[] getMatches () - { - return new String[]{ - "key (create|update) (\\S*) (\\S*)", - "key (list) (\\S*)", - "key (revoke)", - }; - } - - @Override - public void checkReady ( MRCommandContext context ) throws CommandNotReadyException - { - if ( !context.checkClusterReady () ) - { - throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." ); - } - } - - @Override - public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException - { - final MRIdentityManager tm = MRClientFactory.createIdentityManager ( context.getCluster(), context.getApiKey(), context.getApiPwd() ); - context.applyTracer ( tm ); - - try - { - if ( parts[0].equals ( "list" ) ) - { - final ApiKey key = tm.getApiKey ( parts[1] ); - if ( key != null ) - { - out.println ( "email: " + key.getEmail () ); - out.println ( "description: " + key.getDescription () ); - } - else - { - out.println ( "No key returned" ); - } - } - else if ( parts[0].equals ( "create" ) ) - { - final ApiCredential ac = tm.createApiKey ( parts[1], parts[2] ); - if ( ac != null ) - { - out.println ( " key: " + ac.getApiKey () ); - out.println ( "secret: " + ac.getApiSecret () ); - } - else - { - out.println ( "No credential returned?" ); - } - } - else if ( parts[0].equals ( "update" ) ) - { - tm.updateCurrentApiKey ( parts[1], parts[2] ); - out.println ( "Updated" ); - } - else if ( parts[0].equals ( "revoke" ) ) - { - tm.deleteCurrentApiKey (); - out.println ( "Updated" ); - } - } - catch ( HttpObjectNotFoundException e ) - { - out.println ( "Object not found: " + e.getMessage () ); - } - catch ( HttpException e ) - { - out.println ( "HTTP exception: " + e.getMessage () ); - } - catch ( MRApiException e ) - { - out.println ( "API exception: " + e.getMessage () ); - } - catch ( IOException e ) - { - out.println ( "IO exception: " + e.getMessage () ); - } - finally - { - tm.close (); - } - } - - @Override - public void displayHelp ( PrintStream out ) - { - out.println ( "key create <email> <description>" ); - out.println ( "key update <email> <description>" ); - out.println ( "key list <key>" ); - out.println ( "key revoke" ); - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/AuthCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/AuthCommand.java deleted file mode 100644 index 0845432..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/AuthCommand.java +++ /dev/null @@ -1,69 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools; - -import java.io.PrintStream; - -import com.att.nsa.cmdtool.Command; -import com.att.nsa.cmdtool.CommandNotReadyException; - -public class AuthCommand implements Command<MRCommandContext> -{ - @Override - public void checkReady ( MRCommandContext context ) throws CommandNotReadyException - { - } - - @Override - public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException - { - if ( parts.length > 0 ) - { - context.setAuth ( parts[0], parts[1] ); - out.println ( "Now authenticating with " + parts[0] ); - } - else - { - context.clearAuth (); - out.println ( "No longer authenticating." ); - } - } - - @Override - public void displayHelp ( PrintStream out ) - { - out.println ( "auth <apiKey> <apiSecret>" ); - out.println ( "\tuse these credentials on subsequent transactions" ); - out.println ( "noauth" ); - out.println ( "\tdo not use credentials on subsequent transactions" ); - } - - @Override - public String[] getMatches () - { - return new String[] - { - "auth (\\S*) (\\S*)", - "noauth" - }; - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ClusterCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ClusterCommand.java deleted file mode 100644 index 69198d4..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ClusterCommand.java +++ /dev/null @@ -1,81 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools; - -import java.io.PrintStream; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRConsumerImpl; - -import com.att.nsa.cmdtool.Command; -import com.att.nsa.cmdtool.CommandNotReadyException; - -public class ClusterCommand implements Command<MRCommandContext> -{ - - @Override - public String[] getMatches () - { - return new String[]{ - "cluster", - "cluster (\\S*)?", - }; - } - - @Override - public void checkReady ( MRCommandContext context ) throws CommandNotReadyException - { - } - - @Override - public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException - { - if ( parts.length == 0 ) - { - for ( String host : context.getCluster () ) - { - out.println ( host ); - } - } - else - { - context.clearCluster (); - for ( String part : parts ) - { - String[] hosts = part.trim().split ( "\\s+" ); - for ( String host : hosts ) - { - for ( String splitHost : MRConsumerImpl.stringToList(host) ) - { - context.addClusterHost ( splitHost ); - } - } - } - } - } - - @Override - public void displayHelp ( PrintStream out ) - { - out.println ( "cluster host1 host2 ..." ); - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRCommandContext.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRCommandContext.java deleted file mode 100644 index 7b11573..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRCommandContext.java +++ /dev/null @@ -1,101 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools; - -import java.util.Collection; -import java.util.LinkedList; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClient; - -import com.att.nsa.apiClient.http.HttpClient; -import com.att.nsa.apiClient.http.HttpTracer; -import com.att.nsa.cmdtool.CommandContext; - -public class MRCommandContext implements CommandContext -{ - public MRCommandContext () - { - fApiKey = null; - fApiPwd = null; - - fCluster = new LinkedList<String> (); - fCluster.add ( "localhost" ); - } - - @Override - public void requestShutdown () - { - fShutdown = true; - } - - @Override - public boolean shouldContinue () - { - return !fShutdown; - } - - public void setAuth ( String key, String pwd ) { fApiKey = key; fApiPwd = pwd; } - public void clearAuth () { setAuth(null,null); } - - public boolean checkClusterReady () - { - return ( fCluster.size () != 0 ); - } - - public Collection<String> getCluster () - { - return new LinkedList<String> ( fCluster ); - } - - public void clearCluster () - { - fCluster.clear (); - } - - public void addClusterHost ( String host ) - { - fCluster.add ( host ); - } - - public String getApiKey () { return fApiKey; } - public String getApiPwd () { return fApiPwd; } - - public void useTracer ( HttpTracer t ) - { - fTracer = t; - } - public void noTracer () { fTracer = null; } - - public void applyTracer ( MRClient cc ) - { - if ( cc instanceof HttpClient && fTracer != null ) - { - ((HttpClient)cc).installTracer ( fTracer ); - } - } - - private boolean fShutdown; - private String fApiKey; - private String fApiPwd; - private final LinkedList<String> fCluster; - private HttpTracer fTracer = null; -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRTool.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRTool.java deleted file mode 100644 index 563315e..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRTool.java +++ /dev/null @@ -1,50 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools; - -import java.io.IOException; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRClientVersionInfo; - -import com.att.nsa.cmdtool.CommandLineTool; - -public class MRTool extends CommandLineTool<MRCommandContext> -{ - protected MRTool () - { - super ( "MR Tool (" + MRClientVersionInfo.getVersion () + ")", "MR> " ); - - registerCommand ( new ApiKeyCommand () ); - registerCommand ( new AuthCommand () ); - registerCommand ( new ClusterCommand () ); - registerCommand ( new MessageCommand () ); - registerCommand ( new TopicCommand () ); - registerCommand ( new TraceCommand () ); - } - - public static void main ( String[] args ) throws IOException - { - final MRTool ct = new MRTool (); - final MRCommandContext ccc = new MRCommandContext (); - ct.runFromMain ( args, ccc ); - } -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MessageCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MessageCommand.java deleted file mode 100644 index afee95f..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MessageCommand.java +++ /dev/null @@ -1,129 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools; - -import java.io.IOException; -import java.io.PrintStream; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientBuilders.PublisherBuilder; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message; - -import com.att.nsa.cmdtool.Command; -import com.att.nsa.cmdtool.CommandNotReadyException; - -public class MessageCommand implements Command<MRCommandContext> -{ - - @Override - public String[] getMatches () - { - return new String[]{ - "(post) (\\S*) (\\S*) (.*)", - "(read) (\\S*) (\\S*) (\\S*)", - }; - } - - @Override - public void checkReady ( MRCommandContext context ) throws CommandNotReadyException - { - if ( !context.checkClusterReady () ) - { - throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." ); - } - } - - @Override - public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException - { - if ( parts[0].equalsIgnoreCase ( "read" )) - { - final MRConsumer cc = MRClientFactory.createConsumer ( context.getCluster (), parts[1], parts[2], parts[3], - -1, -1, null, context.getApiKey(), context.getApiPwd() ); - context.applyTracer ( cc ); - try - { - for ( String msg : cc.fetch () ) - { - out.println ( msg ); - } - } - catch ( Exception e ) - { - out.println ( "Problem fetching messages: " + e.getMessage() ); - } - finally - { - cc.close (); - } - } - else - { - final MRBatchingPublisher pub = new PublisherBuilder (). - usingHosts ( context.getCluster () ). - onTopic ( parts[1] ). - authenticatedBy ( context.getApiKey(), context.getApiPwd() ). - build () - ; - try - { - pub.send ( parts[2], parts[3] ); - } - catch ( IOException e ) - { - out.println ( "Problem sending message: " + e.getMessage() ); - } - finally - { - List<message> left = null; - try - { - left = pub.close ( 500, TimeUnit.MILLISECONDS ); - } - catch ( IOException e ) - { - out.println ( "Problem sending message: " + e.getMessage() ); - } - catch ( InterruptedException e ) - { - out.println ( "Problem sending message: " + e.getMessage() ); - } - if ( left != null && left.size () > 0 ) - { - out.println ( left.size() + " messages not sent." ); - } - } - } - } - - @Override - public void displayHelp ( PrintStream out ) - { - out.println ( "post <topicName> <partition> <message>" ); - out.println ( "read <topicName> <consumerGroup> <consumerId>" ); - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TopicCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TopicCommand.java deleted file mode 100644 index 6bc6c14..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TopicCommand.java +++ /dev/null @@ -1,210 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools; - -import java.io.IOException; -import java.io.PrintStream; -import java.util.Set; - -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRTopicManager; -import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRTopicManager.TopicInfo; - -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.apiClient.http.HttpObjectNotFoundException; -import com.att.nsa.cmdtool.Command; -import com.att.nsa.cmdtool.CommandNotReadyException; - -public class TopicCommand implements Command<MRCommandContext> -{ - - @Override - public String[] getMatches () - { - return new String[]{ - "topic (list)", - "topic (list) (\\S*)", - "topic (create) (\\S*) (\\S*) (\\S*)", - "topic (grant|revoke) (read|write) (\\S*) (\\S*)", - }; - } - - @Override - public void checkReady ( MRCommandContext context ) throws CommandNotReadyException - { - if ( !context.checkClusterReady () ) - { - throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." ); - } - } - - @Override - public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException - { - final MRTopicManager tm = MRClientFactory.createTopicManager ( context.getCluster(), context.getApiKey(), context.getApiPwd() ); - context.applyTracer ( tm ); - - try - { - if ( parts[0].equals ( "list" ) ) - { - try - { - if ( parts.length == 1 ) - { - for ( String topic : tm.getTopics () ) - { - out.println ( topic ); - } - } - else - { - final TopicInfo ti = tm.getTopicMetadata ( parts[1] ); - - final String owner = ti.getOwner (); - out.println ( " owner: " + ( owner == null ? "<none>" : owner ) ); - - final String desc = ti.getDescription (); - out.println ( "description: " + ( desc == null ? "<none>" : desc ) ); - - final Set<String> prods = ti.getAllowedProducers (); - if ( prods != null ) - { - out.println ( " write ACL: " ); - for ( String key : prods ) - { - out.println ( "\t" + key ); - } - } - else - { - out.println ( " write ACL: <not active>" ); - } - - final Set<String> cons = ti.getAllowedConsumers (); - if ( cons != null ) - { - out.println ( " read ACL: " ); - for ( String key : cons ) - { - out.println ( "\t" + key ); - } - } - else - { - out.println ( " read ACL: <not active>" ); - } - } - } - catch ( IOException x ) - { - out.println ( "Problem with request: " + x.getMessage () ); - } - catch ( HttpObjectNotFoundException e ) - { - out.println ( "Not found: " + e.getMessage () ); - } - } - else if ( parts[0].equals ( "create" ) ) - { - try - { - final int partitions = Integer.parseInt ( parts[2] ); - final int replicas = Integer.parseInt ( parts[3] ); - - tm.createTopic ( parts[1], "", partitions, replicas ); - } - catch ( HttpException e ) - { - out.println ( "Problem with request: " + e.getMessage () ); - } - catch ( IOException e ) - { - out.println ( "Problem with request: " + e.getMessage () ); - } - catch ( NumberFormatException e ) - { - out.println ( "Problem with request: " + e.getMessage () ); - } - } - else if ( parts[0].equals ( "grant" ) ) - { - try - { - if ( parts[1].equals ( "write" ) ) - { - tm.allowProducer ( parts[2], parts[3] ); - } - else if ( parts[1].equals ( "read" ) ) - { - tm.allowConsumer ( parts[2], parts[3] ); - } - } - catch ( HttpException e ) - { - out.println ( "Problem with request: " + e.getMessage () ); - } - catch ( IOException e ) - { - out.println ( "Problem with request: " + e.getMessage () ); - } - } - else if ( parts[0].equals ( "revoke" ) ) - { - try - { - if ( parts[1].equals ( "write" ) ) - { - tm.revokeProducer ( parts[2], parts[3] ); - } - else if ( parts[1].equals ( "read" ) ) - { - tm.revokeConsumer ( parts[2], parts[3] ); - } - } - catch ( HttpException e ) - { - out.println ( "Problem with request: " + e.getMessage () ); - } - catch ( IOException e ) - { - out.println ( "Problem with request: " + e.getMessage () ); - } - } - } - finally - { - tm.close (); - } - } - - @Override - public void displayHelp ( PrintStream out ) - { - out.println ( "topic list" ); - out.println ( "topic list <topicName>" ); - out.println ( "topic create <topicName> <partitions> <replicas>" ); - out.println ( "topic grant write|read <topicName> <apiKey>" ); - out.println ( "topic revoke write|read <topicName> <apiKey>" ); - } - -} diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TraceCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TraceCommand.java deleted file mode 100644 index 280c0ad..0000000 --- a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TraceCommand.java +++ /dev/null @@ -1,118 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools; - -import java.io.PrintStream; -import java.net.URI; -import java.util.List; -import java.util.Map; - -import com.att.nsa.apiClient.http.HttpTracer; -import com.att.nsa.cmdtool.Command; -import com.att.nsa.cmdtool.CommandNotReadyException; - -public class TraceCommand implements Command<MRCommandContext> -{ - @Override - public void checkReady ( MRCommandContext context ) throws CommandNotReadyException - { - } - - @Override - public void execute ( String[] parts, MRCommandContext context, final PrintStream out ) throws CommandNotReadyException - { - if ( parts[0].equalsIgnoreCase ( "on" )) - { - context.useTracer ( new HttpTracer () - { - @Override - public void outbound ( URI uri, Map<String, List<String>> headers, String method, byte[] entity ) - { - out.println ( kLineBreak ); - out.println ( ">>> " + method + " " + uri.toString() ); - for ( Map.Entry<String,List<String>> e : headers.entrySet () ) - { - final StringBuffer vals = new StringBuffer (); - for ( String val : e.getValue () ) - { - if ( vals.length () > 0 ) vals.append ( ", " ); - vals.append ( val ); - } - out.println ( ">>> " + e.getKey () + ": " + vals.toString() ); - } - if ( entity != null ) - { - out.println (); - out.println ( new String ( entity ) ); - } - out.println ( kLineBreak ); - } - - @Override - public void inbound ( Map<String, List<String>> headers, int statusCode, String responseLine, byte[] entity ) - { - out.println ( kLineBreak ); - out.println ( "<<< " + responseLine ); - for ( Map.Entry<String,List<String>> e : headers.entrySet () ) - { - final StringBuffer vals = new StringBuffer (); - for ( String val : e.getValue () ) - { - if ( vals.length () > 0 ) vals.append ( ", " ); - vals.append ( val ); - } - out.println ( "<<< " + e.getKey () + ": " + vals.toString() ); - } - if ( entity != null ) - { - out.println (); - out.println ( new String ( entity ) ); - } - out.println ( kLineBreak ); - } - } ); - } - else - { - context.noTracer (); - } - } - - @Override - public void displayHelp ( PrintStream out ) - { - out.println ( "trace on|off" ); - out.println ( "\tWhen trace is on, HTTP interaction is printed to the console." ); - } - - @Override - public String[] getMatches () - { - return new String[] - { - "trace (on)", - "trace (off)" - }; - } - - private static final String kLineBreak = "======================================================================"; -} |