diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java | 269 |
1 files changed, 134 insertions, 135 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index 4e2f4ecf..f4080b29 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -43,12 +43,12 @@ import org.slf4j.LoggerFactory; public interface TopicEndpoint extends Startable, Lockable { /** - * singleton for global access + * singleton for global access. */ public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); /** - * Add Topic Sources to the communication infrastructure initialized per properties + * Add Topic Sources to the communication infrastructure initialized per properties. * * @param properties properties for Topic Source construction * @return a generic Topic Source @@ -57,7 +57,7 @@ public interface TopicEndpoint extends Startable, Lockable { public List<TopicSource> addTopicSources(Properties properties); /** - * Add Topic Sinks to the communication infrastructure initialized per properties + * Add Topic Sinks to the communication infrastructure initialized per properties. * * @param properties properties for Topic Sink construction * @return a generic Topic Sink @@ -66,14 +66,14 @@ public interface TopicEndpoint extends Startable, Lockable { public List<TopicSink> addTopicSinks(Properties properties); /** - * gets all Topic Sources + * Gets all Topic Sources. * * @return the Topic Source List */ List<TopicSource> getTopicSources(); /** - * get the Topic Sources for the given topic name + * Get the Topic Sources for the given topic name. * * @param topicNames the topic name * @@ -84,8 +84,8 @@ public interface TopicEndpoint extends Startable, Lockable { public List<TopicSource> getTopicSources(List<String> topicNames); /** - * gets the Topic Source for the given topic name and underlying communication infrastructure - * type + * Gets the Topic Source for the given topic name and underlying communication infrastructure + * type. * * @param commType communication infrastructure type * @param topicName the topic name @@ -99,7 +99,7 @@ public interface TopicEndpoint extends Startable, Lockable { public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); /** - * get the UEB Topic Source for the given topic name + * Get the UEB Topic Source for the given topic name. * * @param topicName the topic name * @@ -111,7 +111,7 @@ public interface TopicEndpoint extends Startable, Lockable { public UebTopicSource getUebTopicSource(String topicName); /** - * get the DMAAP Topic Source for the given topic name + * Get the DMAAP Topic Source for the given topic name. * * @param topicName the topic name * @@ -123,43 +123,48 @@ public interface TopicEndpoint extends Startable, Lockable { public DmaapTopicSource getDmaapTopicSource(String topicName); /** - * get the Topic Sinks for the given topic name + * Get the Topic Sinks for the given topic name. * * @param topicNames the topic names * @return the Topic Sink List - * @throws IllegalStateException - * @throws IllegalArgumentException */ public List<TopicSink> getTopicSinks(List<String> topicNames); /** - * get the Topic Sinks for the given topic name and underlying communication infrastructure type + * Get the Topic Sinks for the given topic name and all the underlying communication + * infrastructure type. * * @param topicName the topic name - * @param commType communication infrastructure type * * @return the Topic Sink List * @throws IllegalStateException if the entity is in an invalid state, for example multiple * TopicWriters for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); + public List<TopicSink> getTopicSinks(String topicName); + + /** + * Gets all Topic Sinks. + * + * @return the Topic Sink List + */ + public List<TopicSink> getTopicSinks(); /** - * get the Topic Sinks for the given topic name and all the underlying communication - * infrastructure type + * Get the Topic Sinks for the given topic name and underlying communication infrastructure type. * * @param topicName the topic name + * @param commType communication infrastructure type * * @return the Topic Sink List * @throws IllegalStateException if the entity is in an invalid state, for example multiple * TopicWriters for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public List<TopicSink> getTopicSinks(String topicName); + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); /** - * get the UEB Topic Source for the given topic name + * Get the UEB Topic Source for the given topic name. * * @param topicName the topic name * @@ -171,7 +176,7 @@ public interface TopicEndpoint extends Startable, Lockable { public UebTopicSink getUebTopicSink(String topicName); /** - * get the no-op Topic Sink for the given topic name + * Get the no-op Topic Sink for the given topic name. * * @param topicName the topic name * @@ -183,7 +188,7 @@ public interface TopicEndpoint extends Startable, Lockable { public NoopTopicSink getNoopTopicSink(String topicName); /** - * get the DMAAP Topic Source for the given topic name + * Get the DMAAP Topic Source for the given topic name. * * @param topicName the topic name * @@ -195,42 +200,35 @@ public interface TopicEndpoint extends Startable, Lockable { public DmaapTopicSink getDmaapTopicSink(String topicName); /** - * gets only the UEB Topic Sources + * Gets only the UEB Topic Sources. * * @return the UEB Topic Source List */ public List<UebTopicSource> getUebTopicSources(); /** - * gets only the DMAAP Topic Sources + * Gets only the DMAAP Topic Sources. * * @return the DMAAP Topic Source List */ public List<DmaapTopicSource> getDmaapTopicSources(); /** - * gets all Topic Sinks - * - * @return the Topic Sink List - */ - public List<TopicSink> getTopicSinks(); - - /** - * gets only the UEB Topic Sinks + * Gets only the UEB Topic Sinks. * * @return the UEB Topic Sink List */ public List<UebTopicSink> getUebTopicSinks(); /** - * gets only the DMAAP Topic Sinks + * Gets only the DMAAP Topic Sinks. * * @return the DMAAP Topic Sink List */ public List<DmaapTopicSink> getDmaapTopicSinks(); /** - * gets only the NOOP Topic Sinks + * Gets only the NOOP Topic Sinks. * * @return the NOOP Topic Sinks List */ @@ -244,20 +242,20 @@ public interface TopicEndpoint extends Startable, Lockable { /** * This implementation of the Topic Endpoint Manager, proxies operations to appropriate - * implementations according to the communication infrastructure that are supported + * implementations according to the communication infrastructure that are supported. */ class ProxyTopicEndpointManager implements TopicEndpoint { /** - * Logger + * Logger. */ private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); /** - * Is this element locked? + * Is this element locked boolean. */ protected volatile boolean locked = false; /** - * Is this element alive? + * Is this element alive boolean. */ protected volatile boolean alive = false; @@ -311,6 +309,36 @@ class ProxyTopicEndpointManager implements TopicEndpoint { return sources; } + + @Override + public List<TopicSource> getTopicSources(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSource> sources = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSource uebSource = this.getUebTopicSource(topic); + if (uebSource != null) { + sources.add(uebSource); + } + } catch (final Exception e) { + logger.debug("No UEB source for topic: {}", topic, e); + } + + try { + final TopicSource dmaapSource = this.getDmaapTopicSource(topic); + if (dmaapSource != null) { + sources.add(dmaapSource); + } + } catch (final Exception e) { + logger.debug("No DMAAP source for topic: {}", topic, e); + } + } + return sources; + } @Override public List<TopicSink> getTopicSinks() { @@ -324,6 +352,74 @@ class ProxyTopicEndpointManager implements TopicEndpoint { return sinks; } + @Override + public List<TopicSink> getTopicSinks(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSink> sinks = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSink uebSink = this.getUebTopicSink(topic); + if (uebSink != null) { + sinks.add(uebSink); + } + } catch (final Exception e) { + logger.debug("No UEB sink for topic: {}", topic, e); + } + + try { + final TopicSink dmaapSink = this.getDmaapTopicSink(topic); + if (dmaapSink != null) { + sinks.add(dmaapSink); + } + } catch (final Exception e) { + logger.debug("No DMAAP sink for topic: {}", topic, e); + } + + try { + final TopicSink noopSink = this.getNoopTopicSink(topic); + if (noopSink != null) { + sinks.add(noopSink); + } + } catch (final Exception e) { + logger.debug("No NOOP sink for topic: {}", topic, e); + } + } + return sinks; + } + + @Override + public List<TopicSink> getTopicSinks(String topicName) { + if (topicName == null) { + throw parmException(topicName); + } + + final List<TopicSink> sinks = new ArrayList<>(); + + try { + sinks.add(this.getUebTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getDmaapTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getNoopTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + return sinks; + } + @JsonIgnore @Override public List<UebTopicSource> getUebTopicSources() { @@ -412,6 +508,7 @@ class ProxyTopicEndpointManager implements TopicEndpoint { } /** + * Gets the endpoints. * * @return list of managed endpoints */ @@ -489,75 +586,6 @@ class ProxyTopicEndpointManager implements TopicEndpoint { } @Override - public List<TopicSource> getTopicSources(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List<TopicSource> sources = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) { - sources.add(uebSource); - } - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } - - try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) { - sources.add(dmaapSource); - } - } catch (final Exception e) { - logger.debug("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List<TopicSink> getTopicSinks(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List<TopicSink> sinks = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) { - sinks.add(uebSink); - } - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - - try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) { - sinks.add(dmaapSink); - } - } catch (final Exception e) { - logger.debug("No DMAAP sink for topic: {}", topic, e); - } - - try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) { - sinks.add(noopSink); - } - } catch (final Exception e) { - logger.debug("No NOOP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { if (commType == null) { @@ -605,35 +633,6 @@ class ProxyTopicEndpointManager implements TopicEndpoint { } } - @Override - public List<TopicSink> getTopicSinks(String topicName) { - if (topicName == null) { - throw parmException(topicName); - } - - final List<TopicSink> sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getNoopTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - return sinks; - } - private void logNoSink(String topicName, Exception ex) { logger.debug("No sink for topic: {}", topicName, ex); } |