diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
11 files changed, 97 insertions, 94 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 6d34d32b..034ac66e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -42,12 +42,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Wrapper around libraries to consume from message bus + * Wrapper around libraries to consume from message bus. */ public interface BusConsumer { /** - * fetch messages + * fetch messages. * * @return list of messages * @throws Exception when error encountered by underlying libraries @@ -55,7 +55,7 @@ public interface BusConsumer { public Iterable<String> fetch() throws InterruptedException, IOException; /** - * close underlying library consumer + * close underlying library consumer. */ public void close(); @@ -74,12 +74,12 @@ public interface BusConsumer { } /** - * Cambria based consumer + * Cambria based consumer. */ public static class CambriaConsumerWrapper implements FilterableBusConsumer { /** - * logger + * logger. */ private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); @@ -94,29 +94,29 @@ public interface BusConsumer { private final Object consLocker = new Object(); /** - * Cambria client + * Cambria client. */ private CambriaConsumer consumer; /** - * Cambria client to use for next fetch + * Cambria client to use for next fetch. */ private CambriaConsumer newConsumer = null; /** - * fetch timeout + * fetch timeout. */ protected int fetchTimeout; /** - * close condition + * close condition. */ protected Object closeCondition = new Object(); /** - * Cambria Consumer Wrapper + * Cambria Consumer Wrapper. * BusTopicParam object contains the following parameters - * servers messaging bus hosts + * servers messaging bus hosts. * topic topic * apiKey API Key * apiSecret API Secret @@ -125,9 +125,9 @@ public interface BusConsumer { * fetchTimeout Fetch Timeout * fetchLimit Fetch Limit * - * @param busTopicParams - * @throws GeneralSecurityException - * @throws MalformedURLException + * @param busTopicParams - The parameters for the bus topic + * @throws GeneralSecurityException - Security exception + * @throws MalformedURLException - Malformed URL exception */ public CambriaConsumerWrapper(BusTopicParams busTopicParams) { @@ -244,12 +244,12 @@ public interface BusConsumer { } /** - * MR based consumer + * MR based consumer. */ public abstract class DmaapConsumerWrapper implements BusConsumer { /** - * logger + * logger. */ private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); @@ -259,24 +259,24 @@ public interface BusConsumer { protected static final String PROTOCOL_PROP = "Protocol"; /** - * fetch timeout + * fetch timeout. */ protected int fetchTimeout; /** - * close condition + * close condition. */ protected Object closeCondition = new Object(); /** - * MR Consumer + * MR Consumer. */ protected MRConsumerImpl consumer; /** - * MR Consumer Wrapper - * <p> - * servers messaging bus hosts + * MR Consumer Wrapper. + * + * <p>servers messaging bus hosts * topic topic * apiKey API Key * apiSecret API Secret @@ -288,7 +288,7 @@ public interface BusConsumer { * fetchLimit Fetch Limit * * @param busTopicParams contains above listed attributes - * @throws MalformedURLException + * @throws MalformedURLException URL should be valid */ public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { @@ -360,7 +360,7 @@ public interface BusConsumer { } /** - * MR based consumer + * MR based consumer. */ public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { @@ -369,10 +369,10 @@ public interface BusConsumer { private final Properties props; /** - * BusTopicParams contain the following parameters - * MR Consumer Wrapper - * <p> - * servers messaging bus hosts + * BusTopicParams contain the following parameters. + * MR Consumer Wrapper. + * + * <p>servers messaging bus hosts * topic topic * apiKey API Key * apiSecret API Secret @@ -384,7 +384,7 @@ public interface BusConsumer { * fetchLimit Fetch Limit * * @param busTopicParams contains above listed params - * @throws MalformedURLException + * @throws MalformedURLException URL should be valid */ public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 348100ab..05e92638 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -46,7 +46,7 @@ import org.slf4j.LoggerFactory; public interface BusPublisher { /** - * sends a message + * sends a message. * * @param partitionId id * @param message the message @@ -56,19 +56,19 @@ public interface BusPublisher { public boolean send(String partitionId, String message); /** - * closes the publisher + * closes the publisher. */ public void close(); /** - * Cambria based library publisher + * Cambria based library publisher. */ public static class CambriaPublisherWrapper implements BusPublisher { private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); /** - * The actual Cambria publisher + * The actual Cambria publisher. */ @JsonIgnore protected volatile CambriaBatchingPublisher publisher; @@ -147,20 +147,20 @@ public interface BusPublisher { } /** - * DmaapClient library wrapper + * DmaapClient library wrapper. */ public abstract class DmaapPublisherWrapper implements BusPublisher { private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); /** - * MR based Publisher + * MR based Publisher. */ protected MRSimplerBatchPublisher publisher; protected Properties props; /** - * MR Publisher Wrapper + * MR Publisher Wrapper. * * @param servers messaging bus hosts * @param topic topic @@ -278,11 +278,11 @@ public interface BusPublisher { } /** - * DmaapClient library wrapper + * DmaapClient library wrapper. */ public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { /** - * MR based Publisher + * MR based Publisher. */ public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword, boolean useHttps) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java index 08993126..3e112f90 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java @@ -25,41 +25,40 @@ import java.util.List; import org.onap.policy.common.endpoints.event.comm.bus.ApiKeyEnabled; /** - * Bus Topic Base + * Bus Topic Base. */ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { /** - * API Key + * API Key. */ protected String apiKey; /** - * API Secret + * API Secret. */ protected String apiSecret; /** - * Use https + * Use https. */ protected boolean useHttps; /** - * allow self signed certificates + * allow self signed certificates. */ protected boolean allowSelfSignedCerts; /** - * Instantiates a new Bus Topic Base + * Instantiates a new Bus Topic Base. * - * servers list of servers + * <p>servers list of servers * topic topic name * apiKey API Key * apiSecret API Secret * useHttps does connection use HTTPS? * allowSelfSignedCerts are self-signed certificates allow - * @param busTopicParams - * @return a Bus Topic Base + * @param busTopicParams holds all our parameters * @throws IllegalArgumentException if invalid parameters are present */ public BusTopicBase(BusTopicParams busTopicParams) { @@ -81,6 +80,8 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { } /** + * Is using HTTPS. + * * @return if using https */ public boolean isUseHttps() { @@ -88,6 +89,8 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { } /** + * Is self signed certificates allowed. + * * @return if self signed certificates are allowed */ public boolean isAllowSelfSignedCerts() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java index ffefcbf2..f9eca081 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -24,8 +24,9 @@ import java.util.List; import java.util.Map; /** - * Member variables of this Params class are as follows - * servers DMaaP servers + * Member variables of this Params class are as follows. + * + * <p>servers DMaaP servers * topic DMaaP Topic to be monitored * apiKey DMaaP API Key (optional) * apiSecret DMaaP API Secret (optional) diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index 5493468a..043b8673 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -21,7 +21,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import java.util.List; import java.util.UUID; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink; @@ -36,31 +35,31 @@ import org.slf4j.LoggerFactory; public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink { /** - * loggers + * Loggers. */ private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class); private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); /** - * The partition key to publish to + * The partition key to publish to. */ protected String partitionId; /** - * message bus publisher + * Message bus publisher. */ protected BusPublisher publisher; /** - * constructor for abstract sink + * Constructor for abstract sink. * @param busTopicParams contains below listed attributes - * servers servers - * topic topic - * apiKey api secret - * apiSecret api secret - * partitionId partition id - * useHttps does connection use HTTPS? - * allowSelfSignedCerts are self-signed certificates allow * + * servers servers + * topic topic + * apiKey api secret + * apiSecret api secret + * partitionId partition id + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow * * @throws IllegalArgumentException in invalid parameters are passed in */ public InlineBusTopicSink(BusTopicParams busTopicParams) { @@ -75,7 +74,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi } /** - * Initialize the Bus publisher + * Initialize the Bus publisher. */ public abstract void init(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java index 3dd40312..5c18fb3f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -49,7 +49,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop protected Map<String, String> additionalProps = null; /** - * BusTopicParams contains the below mentioned attributes + * BusTopicParams contains the below mentioned attributes. * servers DMaaP servers * topic DMaaP Topic to be monitored * apiKey DMaaP API Key (optional) diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java index 218e44b4..b07a1966 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java @@ -21,8 +21,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import java.util.List; - import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.slf4j.Logger; @@ -35,15 +33,15 @@ import org.slf4j.LoggerFactory; public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink { /** - * logger + * Logger. */ private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class); /** * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned - * attributes + * attributes. * - * servers list of UEB servers available for publishing + * <p>servers list of UEB servers available for publishing * topic the topic to publish to * apiKey the api key (optional) * apiSecret the api secret (optional) @@ -58,7 +56,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi } /** - * Instantiation of internal resources + * Instantiation of internal resources. */ @Override public void init() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 400cbfe2..e4b335c3 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; /** * This topic source implementation specializes in reading messages over a bus topic source and - * notifying its listeners + * notifying its listeners. */ public abstract class SingleThreadedBusTopicSource extends BusTopicBase implements Runnable, BusTopicSource, FilterableTopicSource { @@ -47,32 +47,32 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); /** - * Bus consumer group + * Bus consumer group. */ protected final String consumerGroup; /** - * Bus consumer instance + * Bus consumer instance. */ protected final String consumerInstance; /** - * Bus fetch timeout + * Bus fetch timeout. */ protected final int fetchTimeout; /** - * Bus fetch limit + * Bus fetch limit. */ protected final int fetchLimit; /** - * Message Bus Consumer + * Message Bus Consumer. */ protected BusConsumer consumer; /** - * Independent thread reading message over my topic + * Independent thread reading message over my topic. */ protected Thread busPollerThread; @@ -113,7 +113,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase } /** - * Initialize the Bus client + * Initialize the Bus client. */ public abstract void init() throws MalformedURLException; @@ -204,7 +204,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase } /** - * Run thread method for the Bus Reader + * Run thread method for the Bus Reader. */ @Override public void run() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index 65f75aa5..0fcb86b4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory; /** * This topic reader implementation specializes in reading messages over DMAAP topic and notifying - * its listeners + * its listeners. */ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable { @@ -73,14 +73,15 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource try { this.init(); } catch (Exception e) { - logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e); + logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", + topic, e.getMessage(), e); throw new IllegalArgumentException(e); } } /** - * Initialize the Cambria or MR Client + * Initialize the Cambria or MR Client. */ @Override public void init() throws MalformedURLException { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java index fb20ccc4..23d3edca 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -26,11 +26,13 @@ import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; /** * This topic source implementation specializes in reading messages over an UEB Bus topic source and - * notifying its listeners + * notifying its listeners. */ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource { /** + * Constructor. + * * @param busTopicParams Parameters object containing all the required inputs * @throws IllegalArgumentException An invalid parameter passed in */ @@ -46,7 +48,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i } /** - * Initialize the Cambria client + * Initialize the Cambria client. */ @Override public void init() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java index ed15ddf7..80664554 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java @@ -32,50 +32,49 @@ import org.slf4j.LoggerFactory; public abstract class TopicBase implements Topic { /** - * logger + * Logger. */ private static Logger logger = LoggerFactory.getLogger(TopicBase.class); /** - * list of servers + * List of servers. */ protected List<String> servers; /** - * Topic + * Topic. */ protected String topic; /** - * event cache + * Event cache. */ protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10); /** * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() => - * !alive + * !alive. */ protected volatile boolean alive = false; /** * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is - * obvious since locked => !alive) + * obvious since locked => !alive). */ protected volatile boolean locked = false; /** - * All my subscribers for new message notifications + * All my subscribers for new message notifications. */ protected final ArrayList<TopicListener> topicListeners = new ArrayList<>(); /** - * Instantiates a new Topic Base + * Instantiates a new Topic Base. * * @param servers list of servers * @param topic topic name * - * @return a Topic Base * @throws IllegalArgumentException if invalid parameters are present */ public TopicBase(List<String> servers, String topic) { @@ -127,7 +126,7 @@ public abstract class TopicBase implements Topic { } /** - * broadcast event to all listeners + * Broadcast event to all listeners. * * @param message the event * @return true if all notifications are performed with no error, false otherwise @@ -148,7 +147,7 @@ public abstract class TopicBase implements Topic { } /** - * take a snapshot of current topic listeners + * Take a snapshot of current topic listeners. * * @return the topic listeners */ |