aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java269
1 files changed, 134 insertions, 135 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
index 4e2f4ecf..f4080b29 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
@@ -43,12 +43,12 @@ import org.slf4j.LoggerFactory;
public interface TopicEndpoint extends Startable, Lockable {
/**
- * singleton for global access
+ * singleton for global access.
*/
public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
/**
- * Add Topic Sources to the communication infrastructure initialized per properties
+ * Add Topic Sources to the communication infrastructure initialized per properties.
*
* @param properties properties for Topic Source construction
* @return a generic Topic Source
@@ -57,7 +57,7 @@ public interface TopicEndpoint extends Startable, Lockable {
public List<TopicSource> addTopicSources(Properties properties);
/**
- * Add Topic Sinks to the communication infrastructure initialized per properties
+ * Add Topic Sinks to the communication infrastructure initialized per properties.
*
* @param properties properties for Topic Sink construction
* @return a generic Topic Sink
@@ -66,14 +66,14 @@ public interface TopicEndpoint extends Startable, Lockable {
public List<TopicSink> addTopicSinks(Properties properties);
/**
- * gets all Topic Sources
+ * Gets all Topic Sources.
*
* @return the Topic Source List
*/
List<TopicSource> getTopicSources();
/**
- * get the Topic Sources for the given topic name
+ * Get the Topic Sources for the given topic name.
*
* @param topicNames the topic name
*
@@ -84,8 +84,8 @@ public interface TopicEndpoint extends Startable, Lockable {
public List<TopicSource> getTopicSources(List<String> topicNames);
/**
- * gets the Topic Source for the given topic name and underlying communication infrastructure
- * type
+ * Gets the Topic Source for the given topic name and underlying communication infrastructure
+ * type.
*
* @param commType communication infrastructure type
* @param topicName the topic name
@@ -99,7 +99,7 @@ public interface TopicEndpoint extends Startable, Lockable {
public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName);
/**
- * get the UEB Topic Source for the given topic name
+ * Get the UEB Topic Source for the given topic name.
*
* @param topicName the topic name
*
@@ -111,7 +111,7 @@ public interface TopicEndpoint extends Startable, Lockable {
public UebTopicSource getUebTopicSource(String topicName);
/**
- * get the DMAAP Topic Source for the given topic name
+ * Get the DMAAP Topic Source for the given topic name.
*
* @param topicName the topic name
*
@@ -123,43 +123,48 @@ public interface TopicEndpoint extends Startable, Lockable {
public DmaapTopicSource getDmaapTopicSource(String topicName);
/**
- * get the Topic Sinks for the given topic name
+ * Get the Topic Sinks for the given topic name.
*
* @param topicNames the topic names
* @return the Topic Sink List
- * @throws IllegalStateException
- * @throws IllegalArgumentException
*/
public List<TopicSink> getTopicSinks(List<String> topicNames);
/**
- * get the Topic Sinks for the given topic name and underlying communication infrastructure type
+ * Get the Topic Sinks for the given topic name and all the underlying communication
+ * infrastructure type.
*
* @param topicName the topic name
- * @param commType communication infrastructure type
*
* @return the Topic Sink List
* @throws IllegalStateException if the entity is in an invalid state, for example multiple
* TopicWriters for a topic name and communication infrastructure
* @throws IllegalArgumentException if invalid parameters are present
*/
- public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName);
+ public List<TopicSink> getTopicSinks(String topicName);
+
+ /**
+ * Gets all Topic Sinks.
+ *
+ * @return the Topic Sink List
+ */
+ public List<TopicSink> getTopicSinks();
/**
- * get the Topic Sinks for the given topic name and all the underlying communication
- * infrastructure type
+ * Get the Topic Sinks for the given topic name and underlying communication infrastructure type.
*
* @param topicName the topic name
+ * @param commType communication infrastructure type
*
* @return the Topic Sink List
* @throws IllegalStateException if the entity is in an invalid state, for example multiple
* TopicWriters for a topic name and communication infrastructure
* @throws IllegalArgumentException if invalid parameters are present
*/
- public List<TopicSink> getTopicSinks(String topicName);
+ public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName);
/**
- * get the UEB Topic Source for the given topic name
+ * Get the UEB Topic Source for the given topic name.
*
* @param topicName the topic name
*
@@ -171,7 +176,7 @@ public interface TopicEndpoint extends Startable, Lockable {
public UebTopicSink getUebTopicSink(String topicName);
/**
- * get the no-op Topic Sink for the given topic name
+ * Get the no-op Topic Sink for the given topic name.
*
* @param topicName the topic name
*
@@ -183,7 +188,7 @@ public interface TopicEndpoint extends Startable, Lockable {
public NoopTopicSink getNoopTopicSink(String topicName);
/**
- * get the DMAAP Topic Source for the given topic name
+ * Get the DMAAP Topic Source for the given topic name.
*
* @param topicName the topic name
*
@@ -195,42 +200,35 @@ public interface TopicEndpoint extends Startable, Lockable {
public DmaapTopicSink getDmaapTopicSink(String topicName);
/**
- * gets only the UEB Topic Sources
+ * Gets only the UEB Topic Sources.
*
* @return the UEB Topic Source List
*/
public List<UebTopicSource> getUebTopicSources();
/**
- * gets only the DMAAP Topic Sources
+ * Gets only the DMAAP Topic Sources.
*
* @return the DMAAP Topic Source List
*/
public List<DmaapTopicSource> getDmaapTopicSources();
/**
- * gets all Topic Sinks
- *
- * @return the Topic Sink List
- */
- public List<TopicSink> getTopicSinks();
-
- /**
- * gets only the UEB Topic Sinks
+ * Gets only the UEB Topic Sinks.
*
* @return the UEB Topic Sink List
*/
public List<UebTopicSink> getUebTopicSinks();
/**
- * gets only the DMAAP Topic Sinks
+ * Gets only the DMAAP Topic Sinks.
*
* @return the DMAAP Topic Sink List
*/
public List<DmaapTopicSink> getDmaapTopicSinks();
/**
- * gets only the NOOP Topic Sinks
+ * Gets only the NOOP Topic Sinks.
*
* @return the NOOP Topic Sinks List
*/
@@ -244,20 +242,20 @@ public interface TopicEndpoint extends Startable, Lockable {
/**
* This implementation of the Topic Endpoint Manager, proxies operations to appropriate
- * implementations according to the communication infrastructure that are supported
+ * implementations according to the communication infrastructure that are supported.
*/
class ProxyTopicEndpointManager implements TopicEndpoint {
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
/**
- * Is this element locked?
+ * Is this element locked boolean.
*/
protected volatile boolean locked = false;
/**
- * Is this element alive?
+ * Is this element alive boolean.
*/
protected volatile boolean alive = false;
@@ -311,6 +309,36 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
return sources;
}
+
+ @Override
+ public List<TopicSource> getTopicSources(List<String> topicNames) {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ final List<TopicSource> sources = new ArrayList<>();
+ for (final String topic : topicNames) {
+ try {
+ final TopicSource uebSource = this.getUebTopicSource(topic);
+ if (uebSource != null) {
+ sources.add(uebSource);
+ }
+ } catch (final Exception e) {
+ logger.debug("No UEB source for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
+ if (dmaapSource != null) {
+ sources.add(dmaapSource);
+ }
+ } catch (final Exception e) {
+ logger.debug("No DMAAP source for topic: {}", topic, e);
+ }
+ }
+ return sources;
+ }
@Override
public List<TopicSink> getTopicSinks() {
@@ -324,6 +352,74 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
return sinks;
}
+ @Override
+ public List<TopicSink> getTopicSinks(List<String> topicNames) {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ final List<TopicSink> sinks = new ArrayList<>();
+ for (final String topic : topicNames) {
+ try {
+ final TopicSink uebSink = this.getUebTopicSink(topic);
+ if (uebSink != null) {
+ sinks.add(uebSink);
+ }
+ } catch (final Exception e) {
+ logger.debug("No UEB sink for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
+ if (dmaapSink != null) {
+ sinks.add(dmaapSink);
+ }
+ } catch (final Exception e) {
+ logger.debug("No DMAAP sink for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSink noopSink = this.getNoopTopicSink(topic);
+ if (noopSink != null) {
+ sinks.add(noopSink);
+ }
+ } catch (final Exception e) {
+ logger.debug("No NOOP sink for topic: {}", topic, e);
+ }
+ }
+ return sinks;
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks(String topicName) {
+ if (topicName == null) {
+ throw parmException(topicName);
+ }
+
+ final List<TopicSink> sinks = new ArrayList<>();
+
+ try {
+ sinks.add(this.getUebTopicSink(topicName));
+ } catch (final Exception e) {
+ logNoSink(topicName, e);
+ }
+
+ try {
+ sinks.add(this.getDmaapTopicSink(topicName));
+ } catch (final Exception e) {
+ logNoSink(topicName, e);
+ }
+
+ try {
+ sinks.add(this.getNoopTopicSink(topicName));
+ } catch (final Exception e) {
+ logNoSink(topicName, e);
+ }
+
+ return sinks;
+ }
+
@JsonIgnore
@Override
public List<UebTopicSource> getUebTopicSources() {
@@ -412,6 +508,7 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
}
/**
+ * Gets the endpoints.
*
* @return list of managed endpoints
*/
@@ -489,75 +586,6 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
}
@Override
- public List<TopicSource> getTopicSources(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- final List<TopicSource> sources = new ArrayList<>();
- for (final String topic : topicNames) {
- try {
- final TopicSource uebSource = this.getUebTopicSource(topic);
- if (uebSource != null) {
- sources.add(uebSource);
- }
- } catch (final Exception e) {
- logger.debug("No UEB source for topic: {}", topic, e);
- }
-
- try {
- final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
- if (dmaapSource != null) {
- sources.add(dmaapSource);
- }
- } catch (final Exception e) {
- logger.debug("No DMAAP source for topic: {}", topic, e);
- }
- }
- return sources;
- }
-
- @Override
- public List<TopicSink> getTopicSinks(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- final List<TopicSink> sinks = new ArrayList<>();
- for (final String topic : topicNames) {
- try {
- final TopicSink uebSink = this.getUebTopicSink(topic);
- if (uebSink != null) {
- sinks.add(uebSink);
- }
- } catch (final Exception e) {
- logger.debug("No UEB sink for topic: {}", topic, e);
- }
-
- try {
- final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
- if (dmaapSink != null) {
- sinks.add(dmaapSink);
- }
- } catch (final Exception e) {
- logger.debug("No DMAAP sink for topic: {}", topic, e);
- }
-
- try {
- final TopicSink noopSink = this.getNoopTopicSink(topic);
- if (noopSink != null) {
- sinks.add(noopSink);
- }
- } catch (final Exception e) {
- logger.debug("No NOOP sink for topic: {}", topic, e);
- }
- }
- return sinks;
- }
-
- @Override
public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) {
if (commType == null) {
@@ -605,35 +633,6 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
}
}
- @Override
- public List<TopicSink> getTopicSinks(String topicName) {
- if (topicName == null) {
- throw parmException(topicName);
- }
-
- final List<TopicSink> sinks = new ArrayList<>();
-
- try {
- sinks.add(this.getUebTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
- sinks.add(this.getDmaapTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
- sinks.add(this.getNoopTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- return sinks;
- }
-
private void logNoSink(String topicName, Exception ex) {
logger.debug("No sink for topic: {}", topicName, ex);
}