aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java')
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java363
1 files changed, 190 insertions, 173 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java
index 91e10e0..e8e7003 100644
--- a/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java
+++ b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java
@@ -5,12 +5,13 @@
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Modifications Copyright © 2018 IBM.
+ * Modifications Copyright © 2021 Orange.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,30 +20,31 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client;
import java.net.MalformedURLException;
import java.util.Collection;
+import java.util.Collections;
import java.util.TreeSet;
import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.impl.MRMetaClient;
import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * A collection of builders for various types of MR API clients
- *
+ * A collection of builders for various types of MR API clients.
+ *
* @author author
*/
-public class MRClientBuilders
-{
- private final static String ILLEGAL_ARGUMENT_MESSAGE = "You must provide at least one host and a topic name.";
+public class MRClientBuilders {
+ private static final Logger logger = LoggerFactory.getLogger(MRClientBuilders.class);
+
+ private static final String ILLEGAL_ARGUMENT_MESSAGE = "You must provide at least one host and a topic name.";
/**
* Instantiates MRClientBuilders.
@@ -50,115 +52,124 @@ public class MRClientBuilders
private MRClientBuilders() {
// prevent instantiation
}
-
+
/**
- * A builder for a topic Consumer
+ * A builder for a topic Consumer.
+ *
* @author author
*/
- public static class ConsumerBuilder
- {
+ public static class ConsumerBuilder {
/**
- * Set the host list
+ * Set the host list.
+ *
* @param hostList a comma-separated list of hosts to use to connect to MR
* @return this builder
*/
- public ConsumerBuilder usingHosts ( String hostList ) {
- return usingHosts ( MRConsumerImpl.stringToList(hostList) );
+ public ConsumerBuilder usingHosts(String hostList) {
+ return usingHosts(MRConsumerImpl.stringToList(hostList));
}
/**
- * Set the host list
+ * Set the host list.
+ *
* @param hostSet a set of hosts to use to connect to MR
* @return this builder
*/
- public ConsumerBuilder usingHosts ( Collection<String> hostSet ) {
- fHosts = hostSet; return this;
+ public ConsumerBuilder usingHosts(Collection<String> hostSet) {
+ fHosts = hostSet;
+ return this;
}
/**
- * Set the topic
+ * Set the topic.
+ *
* @param topic the name of the topic to consume
* @return this builder
*/
- public ConsumerBuilder onTopic ( String topic ) {
- fTopic=topic;
- return this;
+ public ConsumerBuilder onTopic(String topic) {
+ fTopic = topic;
+ return this;
}
/**
- * Set the consumer's group and ID
+ * Set the consumer's group and ID.
+ *
* @param consumerGroup The name of the consumer group this consumer is part of
- * @param consumerId The unique id of this consumer in its group
+ * @param consumerId The unique id of this consumer in its group
* @return this builder
*/
- public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) {
- fGroup = consumerGroup;
- fId = consumerId;
- return this;
+ public ConsumerBuilder knownAs(String consumerGroup, String consumerId) {
+ fGroup = consumerGroup;
+ fId = consumerId;
+ return this;
}
/**
* Set the API key and secret for this client.
+ *
* @param apiKey
* @param apiSecret
* @return this builder
*/
- public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) {
- fApiKey = apiKey;
- fApiSecret = apiSecret;
- return this;
+ public ConsumerBuilder authenticatedBy(String apiKey, String apiSecret) {
+ fApiKey = apiKey;
+ fApiSecret = apiSecret;
+ return this;
}
/**
- * Set the server side timeout
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
+ * Set the server side timeout.
+ *
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
* @return this builder
*/
- public ConsumerBuilder waitAtServer ( int timeoutMs ) {
- fTimeoutMs = timeoutMs;
- return this;
- };
+ public ConsumerBuilder waitAtServer(int timeoutMs) {
+ fTimeoutMs = timeoutMs;
+ return this;
+ }
/**
- * Set the maximum number of messages to receive per transaction
+ * Set the maximum number of messages to receive per transaction.
+ *
* @param limit The maximum number of messages to receive from the server in one transaction.
* @return this builder
*/
- public ConsumerBuilder receivingAtMost ( int limit ) {
- fLimit = limit;
- return this;
- };
+ public ConsumerBuilder receivingAtMost(int limit) {
+ fLimit = limit;
+ return this;
+ }
/**
- * Set a filter to use on the server
+ * Set a filter to use on the server.
+ *
* @param filter a Highland Park standard library filter encoded in JSON
* @return this builder
*/
- public ConsumerBuilder withServerSideFilter ( String filter ) {
- fFilter = filter;
- return this;
+ public ConsumerBuilder withServerSideFilter(String filter) {
+ fFilter = filter;
+ return this;
}
/**
- * Build the consumer
+ * Build the consumer.
+ *
* @return a consumer
*/
- public MRConsumer build ()
- {
- if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
- {
- throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE );
+ public MRConsumer build() {
+ if (fHosts == null || fHosts.isEmpty() || fTopic == null) {
+ throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE);
}
- if ( fGroup == null )
- {
- fGroup = UUID.randomUUID ().toString ();
+ if (fGroup == null) {
+ fGroup = UUID.randomUUID().toString();
fId = "0";
- log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
+ logger.info("Creating non-restartable client with group {} and ID {}.", fGroup, fId);
}
- if ( sfConsumerMock != null ) return sfConsumerMock;
+ if (sfConsumerMock != null) {
+ return sfConsumerMock;
+ }
try {
return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(fHosts)
.setTopic(fTopic).setConsumerGroup(fGroup).setConsumerId(fId)
@@ -181,137 +192,142 @@ public class MRClientBuilders
private String fFilter = null;
}
- /*************************************************************************/
- /*************************************************************************/
- /*************************************************************************/
+ //*************************************************************************/
+ //*************************************************************************/
+ //*************************************************************************/
/**
- * A publisher builder
+ * A publisher builder.
+ *
* @author author
*/
- public static class PublisherBuilder
- {
+ public static class PublisherBuilder {
/**
- * Set the MR/UEB host(s) to use
+ * Set the MR/UEB host(s) to use.
+ *
* @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
* @return this builder
*/
- public PublisherBuilder usingHosts ( String hostlist ) {
- return usingHosts ( MRConsumerImpl.stringToList(hostlist) );
+ public PublisherBuilder usingHosts(String hostlist) {
+ return usingHosts(MRConsumerImpl.stringToList(hostlist));
}
/**
- * Set the MR/UEB host(s) to use
+ * Set the MR/UEB host(s) to use.
+ *
* @param hostSet The host(s) used in the URL to MR. Can be "host:port"
* @return this builder
*/
- public PublisherBuilder usingHosts ( String[] hostSet )
- {
- final TreeSet<String> hosts = new TreeSet<> ();
- for ( String hp : hostSet )
- {
- hosts.add ( hp );
- }
- return usingHosts ( hosts );
+ public PublisherBuilder usingHosts(String[] hostSet) {
+ final TreeSet<String> hosts = new TreeSet<>();
+ Collections.addAll(hosts, hostSet);
+ return usingHosts(hosts);
}
/**
- * Set the MR/UEB host(s) to use
+ * Set the MR/UEB host(s) to use.
+ *
* @param hostlist The host(s) used in the URL to MR. Can be "host:port".
* @return this builder
*/
- public PublisherBuilder usingHosts ( Collection<String> hostlist ) {
- fHosts=hostlist;
- return this;
+ public PublisherBuilder usingHosts(Collection<String> hostlist) {
+ fHosts = hostlist;
+ return this;
}
/**
- * Set the topic to publish on
+ * Set the topic to publish on.
+ *
* @param topic The topic on which to publish messages.
* @return this builder
*/
- public PublisherBuilder onTopic ( String topic ) {
- fTopic = topic;
- return this;
+ public PublisherBuilder onTopic(String topic) {
+ fTopic = topic;
+ return this;
}
/**
* Batch message sends with the given limits.
+ *
* @param messageCount The largest set of messages to batch.
- * @param ageInMs The maximum age of a message waiting in a batch.
+ * @param ageInMs The maximum age of a message waiting in a batch.
* @return this builder
*/
- public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) {
- fMaxBatchSize = messageCount;
- fMaxBatchAgeMs = ageInMs;
- return this;
+ public PublisherBuilder limitBatch(int messageCount, int ageInMs) {
+ fMaxBatchSize = messageCount;
+ fMaxBatchAgeMs = ageInMs;
+ return this;
}
/**
- * Compress transactions
+ * Compress transactions.
+ *
* @return this builder
*/
- public PublisherBuilder withCompresion () {
- return enableCompresion(true);
+ public PublisherBuilder withCompresion() {
+ return enableCompresion(true);
}
/**
- * Do not compress transactions
+ * Do not compress transactions.
+ *
* @return this builder
*/
- public PublisherBuilder withoutCompresion () {
- return enableCompresion(false);
+ public PublisherBuilder withoutCompresion() {
+ return enableCompresion(false);
}
/**
- * Set the compression option
+ * Set the compression option.
+ *
* @param compress true to gzip compress transactions
* @return this builder
*/
- public PublisherBuilder enableCompresion ( boolean compress ) {
- fCompress = compress;
- return this;
+ public PublisherBuilder enableCompresion(boolean compress) {
+ fCompress = compress;
+ return this;
}
/**
* Set the API key and secret for this client.
+ *
* @param apiKey
* @param apiSecret
* @return this builder
*/
- public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) {
- fApiKey = apiKey;
- fApiSecret = apiSecret;
- return this;
+ public PublisherBuilder authenticatedBy(String apiKey, String apiSecret) {
+ fApiKey = apiKey;
+ fApiSecret = apiSecret;
+ return this;
}
/**
- * Build the publisher
+ * Build the publisher.
+ *
* @return a batching publisher
*/
- public MRBatchingPublisher build ()
- {
- if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
- {
- throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE );
+ public MRBatchingPublisher build() {
+ if (fHosts == null || fHosts.isEmpty() || fTopic == null) {
+ throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE);
}
- if ( sfPublisherMock != null ) return sfPublisherMock;
-
- final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
- againstUrls ( fHosts ).
- onTopic ( fTopic ).
- batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
- compress ( fCompress ).
- build ();
- if ( fApiKey != null )
- {
- pub.setApiCredentials ( fApiKey, fApiSecret );
+ if (sfPublisherMock != null) {
+ return sfPublisherMock;
+ }
+
+ final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
+ .againstUrls(fHosts)
+ .onTopic(fTopic)
+ .batchTo(fMaxBatchSize, fMaxBatchAgeMs)
+ .compress(fCompress)
+ .build();
+ if (fApiKey != null) {
+ pub.setApiCredentials(fApiKey, fApiSecret);
}
return pub;
}
-
+
private Collection<String> fHosts = null;
private String fTopic = null;
private int fMaxBatchSize = 1;
@@ -322,50 +338,54 @@ public class MRClientBuilders
}
/**
- * A builder for an identity manager
+ * A builder for an identity manager.
+ *
* @author author
*/
- public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
- {
+ public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager> {
@Override
- protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
- return new MRMetaClient ( hosts );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- } }
+ protected MRIdentityManager constructClient(Collection<String> hosts) {
+ try {
+ return new MRMetaClient(hosts);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
}
/**
- * A builder for a topic manager
+ * A builder for a topic manager.
+ *
* @author author
*/
- public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
- {
+ public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager> {
@Override
- protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
- return new MRMetaClient ( hosts );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- } }
+ protected MRTopicManager constructClient(Collection<String> hosts) {
+ try {
+ return new MRMetaClient(hosts);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
}
/**
* Inject a consumer. Used to support unit tests.
+ *
* @param cc
*/
- public static void $testInject ( MRConsumer cc )
- {
+ public static void $testInject(MRConsumer cc) {
sfConsumerMock = cc;
}
/**
* Inject a publisher. Used to support unit tests.
+ *
* @param pub
*/
- public static void $testInject ( MRBatchingPublisher pub )
- {
+ public static void $testInject(MRBatchingPublisher pub) {
sfPublisherMock = pub;
}
@@ -373,69 +393,66 @@ public class MRClientBuilders
static MRBatchingPublisher sfPublisherMock = null;
/**
- * A builder for an identity manager
+ * A builder for an identity manager.
+ *
* @author author
*/
- public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
- {
- /**
- * Construct an identity manager builder.
- */
- public AbstractAuthenticatedManagerBuilder () {}
+ public abstract static class AbstractAuthenticatedManagerBuilder<T extends MRClient> {
/**
- * Set the host list
+ * Set the host list.
+ *
* @param hostList a comma-separated list of hosts to use to connect to MR
* @return this builder
*/
- public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) {
- return usingHosts ( MRConsumerImpl.stringToList(hostList) );
+ public AbstractAuthenticatedManagerBuilder<T> usingHosts(String hostList) {
+ return usingHosts(MRConsumerImpl.stringToList(hostList));
}
/**
- * Set the host list
+ * Set the host list.
+ *
* @param hostSet a set of hosts to use to connect to MR
* @return this builder
*/
- public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) {
- fHosts = hostSet;
- return this;
+ public AbstractAuthenticatedManagerBuilder<T> usingHosts(Collection<String> hostSet) {
+ fHosts = hostSet;
+ return this;
}
/**
* Set the API key and secret for this client.
+ *
* @param apiKey
* @param apiSecret
* @return this builder
*/
- public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) {
- fApiKey = apiKey;
- fApiSecret = apiSecret;
- return this;
+ public AbstractAuthenticatedManagerBuilder<T> authenticatedBy(String apiKey, String apiSecret) {
+ fApiKey = apiKey;
+ fApiSecret = apiSecret;
+ return this;
}
/**
- * Build the consumer
+ * Build the consumer.
+ *
* @return a consumer
*/
- public T build ()
- {
- if ( fHosts.isEmpty() )
- {
- throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE );
+ public T build() {
+ if (fHosts.isEmpty()) {
+ throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE);
}
- final T mgr = constructClient ( fHosts );
- mgr.setApiCredentials ( fApiKey, fApiSecret );
+ final T mgr = constructClient(fHosts);
+ mgr.setApiCredentials(fApiKey, fApiSecret);
return mgr;
}
- protected abstract T constructClient ( Collection<String> hosts );
+ protected abstract T constructClient(Collection<String> hosts);
private Collection<String> fHosts = null;
private String fApiKey = null;
private String fApiSecret = null;
}
-
- private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );
+
}