diff options
author | Jorge Hernandez <jh1730@att.com> | 2017-09-12 10:27:08 -0500 |
---|---|---|
committer | Jorge Hernandez <jh1730@att.com> | 2017-09-12 10:27:08 -0500 |
commit | 621a6eb53d1fdfa129c794c15a8211b56c6a0346 (patch) | |
tree | 7c15ae3ac711e51d2993bad59268ca94ba2a1501 /policy-endpoints/src/main/java/org | |
parent | 0c625eb6e2349556f1eae87deb2fb7af7fd6132f (diff) |
missing check for noop sinks
Change-Id: If7167415c361fad2478809ac6c41981beaadacd6
Issue-ID: POLICY-119
Signed-off-by: Jorge Hernandez <jh1730@att.com>
Diffstat (limited to 'policy-endpoints/src/main/java/org')
2 files changed, 796 insertions, 776 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java index cc3705ee..09ee9a4e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -29,214 +29,217 @@ import org.onap.policy.drools.event.comm.bus.DmaapTopicSource; import org.onap.policy.drools.event.comm.bus.NoopTopicSink; import org.onap.policy.drools.event.comm.bus.UebTopicSink; import org.onap.policy.drools.event.comm.bus.UebTopicSource; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; import org.onap.policy.drools.properties.Lockable; import org.onap.policy.drools.properties.Startable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.annotation.JsonIgnore; /** - * Abstraction to managed the system's Networked Topic Endpoints, - * sources of all events input into the System. + * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into + * the System. */ public interface TopicEndpoint extends Startable, Lockable { - - /** - * Add Topic Sources to the communication infrastructure initialized per - * properties - * - * @param properties properties for Topic Source construction - * @return a generic Topic Source - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List<TopicSource> addTopicSources(Properties properties); - - /** - * Add Topic Sinks to the communication infrastructure initialized per - * properties - * - * @param properties properties for Topic Sink construction - * @return a generic Topic Sink - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List<TopicSink> addTopicSinks(Properties properties); - - /** - * gets all Topic Sources - * @return the Topic Source List - */ - List<TopicSource> getTopicSources(); - - /** - * get the Topic Sources for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source List - * @throws IllegalStateException if the entity is in an invalid state - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<TopicSource> getTopicSources(List<String> topicNames); - - /** - * gets the Topic Source for the given topic name and - * underlying communication infrastructure type - * - * @param commType communication infrastructure type - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - * @throws UnsupportedOperationException if the operation is not supported. - */ - public TopicSource getTopicSource(Topic.CommInfrastructure commType, - String topicName) - throws UnsupportedOperationException; - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the UEB Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSource getUebTopicSource(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the DMAAP Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource getDmaapTopicSource(String topicName); - - /** - * get the Topic Sinks for the given topic name - * - * @param topicNames the topic names - * @return the Topic Sink List - * @throws IllegalStateException - * @throws IllegalArgumentException - */ - public List<TopicSink> getTopicSinks(List<String> topicNames); - - /** - * get the Topic Sinks for the given topic name and - * underlying communication infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public TopicSink getTopicSink(Topic.CommInfrastructure commType, - String topicName) - throws UnsupportedOperationException; - - /** - * get the Topic Sinks for the given topic name and - * all the underlying communication infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<TopicSink> getTopicSinks(String topicName); - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSink getUebTopicSink(String topicName); - - /** - * get the no-op Topic Sink for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public NoopTopicSink getNoopTopicSink(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink getDmaapTopicSink(String topicName); - - /** - * gets only the UEB Topic Sources - * @return the UEB Topic Source List - */ - public List<UebTopicSource> getUebTopicSources(); - - /** - * gets only the DMAAP Topic Sources - * @return the DMAAP Topic Source List - */ - public List<DmaapTopicSource> getDmaapTopicSources(); - - /** - * gets all Topic Sinks - * @return the Topic Sink List - */ - public List<TopicSink> getTopicSinks(); - - /** - * gets only the UEB Topic Sinks - * @return the UEB Topic Sink List - */ - public List<UebTopicSink> getUebTopicSinks(); - - /** - * gets only the DMAAP Topic Sinks - * @return the DMAAP Topic Sink List - */ - public List<DmaapTopicSink> getDmaapTopicSinks(); - - /** - * gets only the NOOP Topic Sinks - * @return the NOOP Topic Sinks List - */ - public List<NoopTopicSink> getNoopTopicSinks(); - - /** - * singleton for global access - */ - public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); + + /** + * Add Topic Sources to the communication infrastructure initialized per properties + * + * @param properties properties for Topic Source construction + * @return a generic Topic Source + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<TopicSource> addTopicSources(Properties properties); + + /** + * Add Topic Sinks to the communication infrastructure initialized per properties + * + * @param properties properties for Topic Sink construction + * @return a generic Topic Sink + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<TopicSink> addTopicSinks(Properties properties); + + /** + * gets all Topic Sources + * + * @return the Topic Source List + */ + List<TopicSource> getTopicSources(); + + /** + * get the Topic Sources for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source List + * @throws IllegalStateException if the entity is in an invalid state + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<TopicSource> getTopicSources(List<String> topicNames); + + /** + * gets the Topic Source for the given topic name and underlying communication infrastructure type + * + * @param commType communication infrastructure type + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + * @throws UnsupportedOperationException if the operation is not supported. + */ + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException; + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the UEB Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource getUebTopicSource(String topicName); + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the DMAAP Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource getDmaapTopicSource(String topicName); + + /** + * get the Topic Sinks for the given topic name + * + * @param topicNames the topic names + * @return the Topic Sink List + * @throws IllegalStateException + * @throws IllegalArgumentException + */ + public List<TopicSink> getTopicSinks(List<String> topicNames); + + /** + * get the Topic Sinks for the given topic name and underlying communication infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException; + + /** + * get the Topic Sinks for the given topic name and all the underlying communication + * infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<TopicSink> getTopicSinks(String topicName); + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSink getUebTopicSink(String topicName); + + /** + * get the no-op Topic Sink for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public NoopTopicSink getNoopTopicSink(String topicName); + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink getDmaapTopicSink(String topicName); + + /** + * gets only the UEB Topic Sources + * + * @return the UEB Topic Source List + */ + public List<UebTopicSource> getUebTopicSources(); + + /** + * gets only the DMAAP Topic Sources + * + * @return the DMAAP Topic Source List + */ + public List<DmaapTopicSource> getDmaapTopicSources(); + + /** + * gets all Topic Sinks + * + * @return the Topic Sink List + */ + public List<TopicSink> getTopicSinks(); + + /** + * gets only the UEB Topic Sinks + * + * @return the UEB Topic Sink List + */ + public List<UebTopicSink> getUebTopicSinks(); + + /** + * gets only the DMAAP Topic Sinks + * + * @return the DMAAP Topic Sink List + */ + public List<DmaapTopicSink> getDmaapTopicSinks(); + + /** + * gets only the NOOP Topic Sinks + * + * @return the NOOP Topic Sinks List + */ + public List<NoopTopicSink> getNoopTopicSinks(); + + /** + * singleton for global access + */ + public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); } + /* * ----------------- implementation ------------------- */ @@ -246,398 +249,412 @@ public interface TopicEndpoint extends Startable, Lockable { * implementations according to the communication infrastructure that are supported */ class ProxyTopicEndpointManager implements TopicEndpoint { - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); - /** - * Is this element locked? - */ - protected volatile boolean locked = false; - - /** - * Is this element alive? - */ - protected volatile boolean alive = false; - - @Override - public List<TopicSource> addTopicSources(Properties properties) { - - // 1. Create UEB Sources - // 2. Create DMAAP Sources - - List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.build(properties)); - sources.addAll(DmaapTopicSource.factory.build(properties)); - - if (this.isLocked()) { - for (TopicSource source : sources) { - source.lock(); - } - } - - return sources; - } - - @Override - public List<TopicSink> addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - - List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.build(properties)); - sinks.addAll(DmaapTopicSink.factory.build(properties)); - sinks.addAll(NoopTopicSink.factory.build(properties)); - - if (this.isLocked()) { - for (TopicSink sink : sinks) { - sink.lock(); - } - } - - return sinks; - } - - @Override - public List<TopicSource> getTopicSources() { - - List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.inventory()); - sources.addAll(DmaapTopicSource.factory.inventory()); - - return sources; - } - - @Override - public List<TopicSink> getTopicSinks() { - - List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.inventory()); - sinks.addAll(DmaapTopicSink.factory.inventory()); - sinks.addAll(NoopTopicSink.factory.inventory()); - - return sinks; - } - - @JsonIgnore - @Override - public List<UebTopicSource> getUebTopicSources() { - return UebTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSource> getDmaapTopicSources() { - return DmaapTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<UebTopicSink> getUebTopicSinks() { - return UebTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSink> getDmaapTopicSinks() { - return DmaapTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<NoopTopicSink> getNoopTopicSinks() { - return NoopTopicSink.factory.inventory(); - } - - @Override - public boolean start() { - - synchronized (this) { - if (this.locked) { - throw new IllegalStateException(this + " is locked"); - } - - if (this.alive) { - return true; - } - - this.alive = true; - } - - List<Startable> endpoints = getEndpoints(); - - boolean success = true; - for (Startable endpoint: endpoints) { - try { - success = endpoint.start() && success; - } catch (Exception e) { - success = false; - logger.error("Problem starting endpoint: {}", endpoint, e); - } - } - - return success; - } - - - @Override - public boolean stop() { - - /* - * stop regardless if it is locked, in other - * words, stop operation has precedence over - * locks. - */ - synchronized (this) { - this.alive = false; - } - - List<Startable> endpoints = getEndpoints(); - - boolean success = true; - for (Startable endpoint: endpoints) { - try { - success = endpoint.stop() && success; - } catch (Exception e) { - success = false; - logger.error("Problem stopping endpoint: {}", endpoint, e); - } - } - - return success; - } - - /** - * - * @return list of managed endpoints - */ - @JsonIgnore - protected List<Startable> getEndpoints() { - List<Startable> endpoints = new ArrayList<>(); - - endpoints.addAll(this.getTopicSources()); - endpoints.addAll(this.getTopicSinks()); - - return endpoints; - } - - @Override - public void shutdown() { - UebTopicSource.factory.destroy(); - UebTopicSink.factory.destroy(); - - DmaapTopicSource.factory.destroy(); - DmaapTopicSink.factory.destroy(); - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean lock() { - - synchronized (this) { - if (locked) - return true; - - this.locked = true; - } - - for (TopicSource source: this.getTopicSources()) { - source.lock(); - } - - for (TopicSink sink: this.getTopicSinks()) { - sink.lock(); - } - - return true; - } - - @Override - public boolean unlock() { - synchronized (this) { - if (!locked) - return true; - - this.locked = false; - } - - for (TopicSource source: this.getTopicSources()) { - source.unlock(); - } - - for (TopicSink sink: this.getTopicSinks()) { - sink.unlock(); - } - - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public List<TopicSource> getTopicSources(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - List<TopicSource> sources = new ArrayList<>(); - for (String topic: topicNames) { - try { - TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) - sources.add(uebSource); - } catch (Exception e) { - logger.info("No UEB source for topic: {}", topic, e); - } - - try { - TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) - sources.add(dmaapSource); - } catch (Exception e) { - logger.info("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List<TopicSink> getTopicSinks(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - List<TopicSink> sinks = new ArrayList<>(); - for (String topic: topicNames) { - try { - TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) - sinks.add(uebSink); - } catch (Exception e) { - logger.info("No UEB sink for topic: {}", topic, e); - } - - try { - TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) - sinks.add(dmaapSink); - } catch (Exception e) { - logger.info("No DMAAP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) - throws UnsupportedOperationException { - - if (commType == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - if (topicName == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - case REST: - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) - throws UnsupportedOperationException { - if (commType == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - if (topicName == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case REST: - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public List<TopicSink> getTopicSinks(String topicName) { - - if (topicName == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - List<TopicSink> sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (Exception e) { - logger.debug("No sink for topic: {}", topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (Exception e) { - logger.debug("No sink for topic: {}", topicName, e); - } - - return sinks; - } - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicSource.factory.get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicSink.factory.get(topicName); - } - - @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicSource.factory.get(topicName); - } - - @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicSink.factory.get(topicName); - } - - @Override - public NoopTopicSink getNoopTopicSink(String topicName) { - return NoopTopicSink.factory.get(topicName); - } - + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); + /** + * Is this element locked? + */ + protected volatile boolean locked = false; + + /** + * Is this element alive? + */ + protected volatile boolean alive = false; + + @Override + public List<TopicSource> addTopicSources(Properties properties) { + + // 1. Create UEB Sources + // 2. Create DMAAP Sources + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.build(properties)); + sources.addAll(DmaapTopicSource.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSource source : sources) { + source.lock(); + } + } + + return sources; + } + + @Override + public List<TopicSink> addTopicSinks(Properties properties) { + // 1. Create UEB Sinks + // 2. Create DMAAP Sinks + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.build(properties)); + sinks.addAll(DmaapTopicSink.factory.build(properties)); + sinks.addAll(NoopTopicSink.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSink sink : sinks) { + sink.lock(); + } + } + + return sinks; + } + + @Override + public List<TopicSource> getTopicSources() { + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.inventory()); + sources.addAll(DmaapTopicSource.factory.inventory()); + + return sources; + } + + @Override + public List<TopicSink> getTopicSinks() { + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.inventory()); + sinks.addAll(DmaapTopicSink.factory.inventory()); + sinks.addAll(NoopTopicSink.factory.inventory()); + + return sinks; + } + + @JsonIgnore + @Override + public List<UebTopicSource> getUebTopicSources() { + return UebTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSource> getDmaapTopicSources() { + return DmaapTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<UebTopicSink> getUebTopicSinks() { + return UebTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSink> getDmaapTopicSinks() { + return DmaapTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<NoopTopicSink> getNoopTopicSinks() { + return NoopTopicSink.factory.inventory(); + } + + @Override + public boolean start() { + + synchronized (this) { + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (this.alive) { + return true; + } + + this.alive = true; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.start() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem starting endpoint: {}", endpoint, e); + } + } + + return success; + } + + + @Override + public boolean stop() { + + /* + * stop regardless if it is locked, in other words, stop operation has precedence over locks. + */ + synchronized (this) { + this.alive = false; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.stop() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem stopping endpoint: {}", endpoint, e); + } + } + + return success; + } + + /** + * + * @return list of managed endpoints + */ + @JsonIgnore + protected List<Startable> getEndpoints() { + final List<Startable> endpoints = new ArrayList<>(); + + endpoints.addAll(this.getTopicSources()); + endpoints.addAll(this.getTopicSinks()); + + return endpoints; + } + + @Override + public void shutdown() { + UebTopicSource.factory.destroy(); + UebTopicSink.factory.destroy(); + NoopTopicSink.factory.destroy(); + + DmaapTopicSource.factory.destroy(); + DmaapTopicSink.factory.destroy(); + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public boolean lock() { + + synchronized (this) { + if (this.locked) + return true; + + this.locked = true; + } + + for (final TopicSource source : this.getTopicSources()) { + source.lock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.lock(); + } + + return true; + } + + @Override + public boolean unlock() { + synchronized (this) { + if (!this.locked) + return true; + + this.locked = false; + } + + for (final TopicSource source : this.getTopicSources()) { + source.unlock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.unlock(); + } + + return true; + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @Override + public List<TopicSource> getTopicSources(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSource> sources = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSource uebSource = this.getUebTopicSource(topic); + if (uebSource != null) + sources.add(uebSource); + } catch (final Exception e) { + logger.debug("No UEB source for topic: {}", topic, e); + } + + try { + final TopicSource dmaapSource = this.getDmaapTopicSource(topic); + if (dmaapSource != null) + sources.add(dmaapSource); + } catch (final Exception e) { + logger.debug("No DMAAP source for topic: {}", topic, e); + } + } + return sources; + } + + @Override + public List<TopicSink> getTopicSinks(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSink> sinks = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSink uebSink = this.getUebTopicSink(topic); + if (uebSink != null) + sinks.add(uebSink); + } catch (final Exception e) { + logger.debug("No UEB sink for topic: {}", topic, e); + } + + try { + final TopicSink dmaapSink = this.getDmaapTopicSink(topic); + if (dmaapSink != null) + sinks.add(dmaapSink); + } catch (final Exception e) { + logger.debug("No DMAAP sink for topic: {}", topic, e); + } + + try { + final TopicSink noopSink = this.getNoopTopicSink(topic); + if (noopSink != null) + sinks.add(noopSink); + } catch (final Exception e) { + logger.debug("No NOOP sink for topic: {}", topic, e); + } + } + return sinks; + } + + @Override + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException { + + if (commType == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + if (topicName == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSource(topicName); + case DMAAP: + return this.getDmaapTopicSource(topicName); + case REST: + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException { + if (commType == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + if (topicName == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSink(topicName); + case DMAAP: + return this.getDmaapTopicSink(topicName); + case NOOP: + return this.getNoopTopicSink(topicName); + case REST: + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public List<TopicSink> getTopicSinks(String topicName) { + if (topicName == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + final List<TopicSink> sinks = new ArrayList<>(); + + try { + sinks.add(this.getUebTopicSink(topicName)); + } catch (final Exception e) { + logger.debug("No sink for topic: {}", topicName, e); + } + + try { + sinks.add(this.getDmaapTopicSink(topicName)); + } catch (final Exception e) { + logger.debug("No sink for topic: {}", topicName, e); + } + + try { + sinks.add(this.getNoopTopicSink(topicName)); + } catch (final Exception e) { + logger.debug("No sink for topic: {}", topicName, e); + } + + return sinks; + } + + @Override + public UebTopicSource getUebTopicSource(String topicName) { + return UebTopicSource.factory.get(topicName); + } + + @Override + public UebTopicSink getUebTopicSink(String topicName) { + return UebTopicSink.factory.get(topicName); + } + + @Override + public DmaapTopicSource getDmaapTopicSource(String topicName) { + return DmaapTopicSource.factory.get(topicName); + } + + @Override + public DmaapTopicSink getDmaapTopicSink(String topicName) { + return DmaapTopicSink.factory.get(topicName); + } + + @Override + public NoopTopicSink getNoopTopicSink(String topicName) { + return NoopTopicSink.factory.get(topicName); + } + } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java index 946e48c0..53315391 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,191 +33,194 @@ import org.slf4j.LoggerFactory; * Noop Topic Sink Factory */ public interface NoopTopicSinkFactory { - - /** - * Creates noop topic sinks based on properties files - * - * @param properties Properties containing initialization values - * - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<NoopTopicSink> build(Properties properties); - - /** - * builds a noop sink - * - * @param servers list of servers - * @param topic topic name - * @param managed is this sink endpoint managed? - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public NoopTopicSink build(List<String> servers, String topic, boolean managed); - - /** - * Destroys a sink based on the topic - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - public void destroy(String topic); - - /** - * gets a sink based on topic name - * @param topic the topic name - * - * @return a sink with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the sink is in an incorrect state - */ - public NoopTopicSink get(String topic); - - /** - * Provides a snapshot of the UEB Topic Writers - * @return a list of the UEB Topic Writers - */ - public List<NoopTopicSink> inventory(); - - /** - * Destroys all sinks - */ - public void destroy(); + + /** + * Creates noop topic sinks based on properties files + * + * @param properties Properties containing initialization values + * + * @return a noop topic sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<NoopTopicSink> build(Properties properties); + + /** + * builds a noop sink + * + * @param servers list of servers + * @param topic topic name + * @param managed is this sink endpoint managed? + * @return a noop topic sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public NoopTopicSink build(List<String> servers, String topic, boolean managed); + + /** + * Destroys a sink based on the topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * gets a sink based on topic name + * + * @param topic the topic name + * + * @return a sink with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the sink is in an incorrect state + */ + public NoopTopicSink get(String topic); + + /** + * Provides a snapshot of the UEB Topic Writers + * + * @return a list of the UEB Topic Writers + */ + public List<NoopTopicSink> inventory(); + + /** + * Destroys all sinks + */ + public void destroy(); } + /* ------------- implementation ----------------- */ /** * Factory of noop sinks */ class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * noop topic sinks map - */ - protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>(); - - @Override - public List<NoopTopicSink> build(Properties properties) { - - String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS); - if (sinkTopics == null || sinkTopics.isEmpty()) { - logger.info("{}: no topic for noop sink", this); - return new ArrayList<>(); - } - - List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); - List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>(); - synchronized(this) { - for (String topic: sinkTopicList) { - if (this.noopTopicSinks.containsKey(topic)) { - newSinks.add(this.noopTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + - topic + - PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) - servers = "noop"; - - List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + - PolicyProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - NoopTopicSink noopSink = this.build(serverList, topic, managed); - newSinks.add(noopSink); - } - return newSinks; - } - } - - @Override - public NoopTopicSink build(List<String> servers, String topic, boolean managed) { - if (servers == null) { - servers = new ArrayList<>(); - } - - if (servers.isEmpty()) { - servers.add("noop"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A topic must be provided"); - } - - synchronized (this) { - if (noopTopicSinks.containsKey(topic)) { - return noopTopicSinks.get(topic); - } - - NoopTopicSink sink = - new NoopTopicSink(servers, topic); - - if (managed) - noopTopicSinks.put(topic, sink); - - return sink; - } - } - - @Override - public void destroy(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A topic must be provided"); - } - - NoopTopicSink noopSink; - synchronized(this) { - if (!noopTopicSinks.containsKey(topic)) { - return; - } - - noopSink = noopTopicSinks.remove(topic); - } - - noopSink.shutdown(); - } - - @Override - public void destroy() { - List<NoopTopicSink> sinks = this.inventory(); - for (NoopTopicSink sink: sinks) { - sink.shutdown(); - } - - synchronized(this) { - this.noopTopicSinks.clear(); - } - } - - @Override - public NoopTopicSink get(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A topic must be provided"); - } - - synchronized(this) { - if (noopTopicSinks.containsKey(topic)) { - return noopTopicSinks.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public List<NoopTopicSink> inventory() { - return new ArrayList<>(this.noopTopicSinks.values()); - } + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + + /** + * noop topic sinks map + */ + protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>(); + + @Override + public List<NoopTopicSink> build(Properties properties) { + + final String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS); + if (sinkTopics == null || sinkTopics.isEmpty()) { + logger.info("{}: no topic for noop sink", this); + return new ArrayList<>(); + } + + final List<String> sinkTopicList = + new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); + final List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>(); + synchronized (this) { + for (final String topic : sinkTopicList) { + if (this.noopTopicSinks.containsKey(topic)) { + newSinks.add(this.noopTopicSinks.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) + servers = "noop"; + + final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + final String managedString = + properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + final NoopTopicSink noopSink = this.build(serverList, topic, managed); + newSinks.add(noopSink); + } + return newSinks; + } + } + + @Override + public NoopTopicSink build(List<String> servers, String topic, boolean managed) { + if (servers == null) { + servers = new ArrayList<>(); + } + + if (servers.isEmpty()) { + servers.add("noop"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } + + final NoopTopicSink sink = new NoopTopicSink(servers, topic); + + if (managed) + this.noopTopicSinks.put(topic, sink); + + return sink; + } + } + + @Override + public void destroy(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + NoopTopicSink noopSink; + synchronized (this) { + if (!this.noopTopicSinks.containsKey(topic)) { + return; + } + + noopSink = this.noopTopicSinks.remove(topic); + } + + noopSink.shutdown(); + } + + @Override + public void destroy() { + final List<NoopTopicSink> sinks = this.inventory(); + for (final NoopTopicSink sink : sinks) { + sink.shutdown(); + } + + synchronized (this) { + this.noopTopicSinks.clear(); + } + } + + @Override + public NoopTopicSink get(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + @Override + public List<NoopTopicSink> inventory() { + return new ArrayList<>(this.noopTopicSinks.values()); + } } |