diff options
Diffstat (limited to 'policy-endpoints/src')
22 files changed, 2010 insertions, 1129 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index f4080b29..52c1f07b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -1,8 +1,8 @@ /* * ============LICENSE_START======================================================= - * policy-endpoints + * ONAP * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,21 +20,16 @@ package org.onap.policy.common.endpoints.event.comm; -import com.fasterxml.jackson.annotation.JsonIgnore; - -import java.util.ArrayList; import java.util.List; import java.util.Properties; - import org.onap.policy.common.capabilities.Lockable; import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into @@ -45,7 +40,7 @@ public interface TopicEndpoint extends Startable, Lockable { /** * singleton for global access. */ - public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); + TopicEndpoint manager = new TopicEndpointProxy(); /** * Add Topic Sources to the communication infrastructure initialized per properties. @@ -54,7 +49,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @return a generic Topic Source * @throws IllegalArgumentException when invalid arguments are provided */ - public List<TopicSource> addTopicSources(Properties properties); + List<TopicSource> addTopicSources(Properties properties); /** * Add Topic Sinks to the communication infrastructure initialized per properties. @@ -63,7 +58,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @return a generic Topic Sink * @throws IllegalArgumentException when invalid arguments are provided */ - public List<TopicSink> addTopicSinks(Properties properties); + List<TopicSink> addTopicSinks(Properties properties); /** * Gets all Topic Sources. @@ -81,7 +76,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @throws IllegalStateException if the entity is in an invalid state * @throws IllegalArgumentException if invalid parameters are present */ - public List<TopicSource> getTopicSources(List<String> topicNames); + List<TopicSource> getTopicSources(List<String> topicNames); /** * Gets the Topic Source for the given topic name and underlying communication infrastructure @@ -96,7 +91,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @throws IllegalArgumentException if invalid parameters are present * @throws UnsupportedOperationException if the operation is not supported. */ - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); + TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); /** * Get the UEB Topic Source for the given topic name. @@ -108,7 +103,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSource getUebTopicSource(String topicName); + UebTopicSource getUebTopicSource(String topicName); /** * Get the DMAAP Topic Source for the given topic name. @@ -120,7 +115,15 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSource getDmaapTopicSource(String topicName); + DmaapTopicSource getDmaapTopicSource(String topicName); + + /** + * Get the Noop Source for the given topic name. + * + * @param topicName the topic name. + * @return the Noop Source. + */ + NoopTopicSource getNoopTopicSource(String topicName); /** * Get the Topic Sinks for the given topic name. @@ -128,7 +131,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @param topicNames the topic names * @return the Topic Sink List */ - public List<TopicSink> getTopicSinks(List<String> topicNames); + List<TopicSink> getTopicSinks(List<String> topicNames); /** * Get the Topic Sinks for the given topic name and all the underlying communication @@ -141,14 +144,14 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicWriters for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public List<TopicSink> getTopicSinks(String topicName); + List<TopicSink> getTopicSinks(String topicName); /** * Gets all Topic Sinks. * * @return the Topic Sink List */ - public List<TopicSink> getTopicSinks(); + List<TopicSink> getTopicSinks(); /** * Get the Topic Sinks for the given topic name and underlying communication infrastructure type. @@ -161,7 +164,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicWriters for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); + TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); /** * Get the UEB Topic Source for the given topic name. @@ -173,7 +176,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSink getUebTopicSink(String topicName); + UebTopicSink getUebTopicSink(String topicName); /** * Get the no-op Topic Sink for the given topic name. @@ -185,7 +188,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public NoopTopicSink getNoopTopicSink(String topicName); + NoopTopicSink getNoopTopicSink(String topicName); /** * Get the DMAAP Topic Source for the given topic name. @@ -197,469 +200,47 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSink getDmaapTopicSink(String topicName); + DmaapTopicSink getDmaapTopicSink(String topicName); /** * Gets only the UEB Topic Sources. * * @return the UEB Topic Source List */ - public List<UebTopicSource> getUebTopicSources(); + List<UebTopicSource> getUebTopicSources(); /** * Gets only the DMAAP Topic Sources. * * @return the DMAAP Topic Source List */ - public List<DmaapTopicSource> getDmaapTopicSources(); + List<DmaapTopicSource> getDmaapTopicSources(); + + /** + * Gets only the NOOP Topic Sources. + * + * @return the NOOP Topic Source List + */ + List<NoopTopicSource> getNoopTopicSources(); /** * Gets only the UEB Topic Sinks. * * @return the UEB Topic Sink List */ - public List<UebTopicSink> getUebTopicSinks(); + List<UebTopicSink> getUebTopicSinks(); /** * Gets only the DMAAP Topic Sinks. * * @return the DMAAP Topic Sink List */ - public List<DmaapTopicSink> getDmaapTopicSinks(); + List<DmaapTopicSink> getDmaapTopicSinks(); /** * Gets only the NOOP Topic Sinks. * * @return the NOOP Topic Sinks List */ - public List<NoopTopicSink> getNoopTopicSinks(); -} - - -/* - * ----------------- implementation ------------------- - */ - -/** - * This implementation of the Topic Endpoint Manager, proxies operations to appropriate - * implementations according to the communication infrastructure that are supported. - */ -class ProxyTopicEndpointManager implements TopicEndpoint { - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); - /** - * Is this element locked boolean. - */ - protected volatile boolean locked = false; - - /** - * Is this element alive boolean. - */ - protected volatile boolean alive = false; - - @Override - public List<TopicSource> addTopicSources(Properties properties) { - - // 1. Create UEB Sources - // 2. Create DMAAP Sources - - final List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.build(properties)); - sources.addAll(DmaapTopicSource.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSource source : sources) { - source.lock(); - } - } - - return sources; - } - - @Override - public List<TopicSink> addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - - final List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.build(properties)); - sinks.addAll(DmaapTopicSink.factory.build(properties)); - sinks.addAll(NoopTopicSink.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSink sink : sinks) { - sink.lock(); - } - } - - return sinks; - } - - @Override - public List<TopicSource> getTopicSources() { - - final List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.inventory()); - sources.addAll(DmaapTopicSource.factory.inventory()); - - return sources; - } - - @Override - public List<TopicSource> getTopicSources(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List<TopicSource> sources = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) { - sources.add(uebSource); - } - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } - - try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) { - sources.add(dmaapSource); - } - } catch (final Exception e) { - logger.debug("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List<TopicSink> getTopicSinks() { - - final List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.inventory()); - sinks.addAll(DmaapTopicSink.factory.inventory()); - sinks.addAll(NoopTopicSink.factory.inventory()); - - return sinks; - } - - @Override - public List<TopicSink> getTopicSinks(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List<TopicSink> sinks = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) { - sinks.add(uebSink); - } - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - - try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) { - sinks.add(dmaapSink); - } - } catch (final Exception e) { - logger.debug("No DMAAP sink for topic: {}", topic, e); - } - - try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) { - sinks.add(noopSink); - } - } catch (final Exception e) { - logger.debug("No NOOP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override - public List<TopicSink> getTopicSinks(String topicName) { - if (topicName == null) { - throw parmException(topicName); - } - - final List<TopicSink> sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getNoopTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - return sinks; - } - - @JsonIgnore - @Override - public List<UebTopicSource> getUebTopicSources() { - return UebTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSource> getDmaapTopicSources() { - return DmaapTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<UebTopicSink> getUebTopicSinks() { - return UebTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSink> getDmaapTopicSinks() { - return DmaapTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<NoopTopicSink> getNoopTopicSinks() { - return NoopTopicSink.factory.inventory(); - } - - @Override - public boolean start() { - - synchronized (this) { - if (this.locked) { - throw new IllegalStateException(this + " is locked"); - } - - if (this.alive) { - return true; - } - - this.alive = true; - } - - final List<Startable> endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.start() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem starting endpoint: {}", endpoint, e); - } - } - - return success; - } - - - @Override - public boolean stop() { - - /* - * stop regardless if it is locked, in other words, stop operation has precedence over - * locks. - */ - synchronized (this) { - this.alive = false; - } - - final List<Startable> endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.stop() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem stopping endpoint: {}", endpoint, e); - } - } - - return success; - } - - /** - * Gets the endpoints. - * - * @return list of managed endpoints - */ - @JsonIgnore - protected List<Startable> getEndpoints() { - final List<Startable> endpoints = new ArrayList<>(); - - endpoints.addAll(this.getTopicSources()); - endpoints.addAll(this.getTopicSinks()); - - return endpoints; - } - - @Override - public void shutdown() { - UebTopicSource.factory.destroy(); - UebTopicSink.factory.destroy(); - NoopTopicSink.factory.destroy(); - - DmaapTopicSource.factory.destroy(); - DmaapTopicSink.factory.destroy(); - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean lock() { - - synchronized (this) { - if (this.locked) { - return true; - } - - this.locked = true; - } - - for (final TopicSource source : this.getTopicSources()) { - source.lock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.lock(); - } - - return true; - } - - @Override - public boolean unlock() { - synchronized (this) { - if (!this.locked) { - return true; - } - - this.locked = false; - } - - for (final TopicSource source : this.getTopicSources()) { - source.unlock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.unlock(); - } - - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { - - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - private IllegalArgumentException parmException(String topicName) { - return new IllegalArgumentException( - "Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - @Override - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case NOOP: - return this.getNoopTopicSink(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - private void logNoSink(String topicName, Exception ex) { - logger.debug("No sink for topic: {}", topicName, ex); - } - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicSource.factory.get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicSink.factory.get(topicName); - } - - @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicSource.factory.get(topicName); - } - - @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicSink.factory.get(topicName); - } - - @Override - public NoopTopicSink getNoopTopicSink(String topicName) { - return NoopTopicSink.factory.get(topicName); - } - + List<NoopTopicSink> getNoopTopicSinks(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java new file mode 100644 index 00000000..9912761f --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -0,0 +1,489 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.capabilities.Startable; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation of the Topic Endpoint Manager, proxies operations to the appropriate + * implementation(s). + */ +class TopicEndpointProxy implements TopicEndpoint { + /** + * Logger. + */ + private static final Logger logger = LoggerFactory.getLogger(TopicEndpointProxy.class); + + /** + * Is this element locked boolean. + */ + private volatile boolean locked = false; + + /** + * Is this element alive boolean. + */ + private volatile boolean alive = false; + + @Override + public List<TopicSource> addTopicSources(Properties properties) { + + // 1. Create UEB Sources + // 2. Create DMAAP Sources + // 3. Create NOOP Sources + + List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.build(properties)); + sources.addAll(DmaapTopicSource.factory.build(properties)); + sources.addAll(NoopTopicSource.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSource source : sources) { + source.lock(); + } + } + + return sources; + } + + @Override + public List<TopicSink> addTopicSinks(Properties properties) { + // 1. Create UEB Sinks + // 2. Create DMAAP Sinks + // 3. Create NOOP Sinks + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.build(properties)); + sinks.addAll(DmaapTopicSink.factory.build(properties)); + sinks.addAll(NoopTopicSink.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSink sink : sinks) { + sink.lock(); + } + } + + return sinks; + } + + @Override + public List<TopicSource> getTopicSources() { + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.inventory()); + sources.addAll(DmaapTopicSource.factory.inventory()); + sources.addAll(NoopTopicSource.factory.inventory()); + + return sources; + } + + @Override + public List<TopicSource> getTopicSources(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSource> sources = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSource uebSource = this.getUebTopicSource(topic); + if (uebSource != null) { + sources.add(uebSource); + } + } catch (final Exception e) { + logger.debug("No UEB source for topic: {}", topic, e); + } + + try { + final TopicSource dmaapSource = this.getDmaapTopicSource(topic); + if (dmaapSource != null) { + sources.add(dmaapSource); + } + } catch (final Exception e) { + logger.debug("No DMAAP source for topic: {}", topic, e); + } + + try { + final TopicSource noopSource = this.getNoopTopicSource(topic); + if (noopSource != null) { + sources.add(noopSource); + } + } catch (final Exception e) { + logger.debug("No NOOP source for topic: {}", topic, e); + } + } + return sources; + } + + @Override + public List<TopicSink> getTopicSinks() { + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.inventory()); + sinks.addAll(DmaapTopicSink.factory.inventory()); + sinks.addAll(NoopTopicSink.factory.inventory()); + + return sinks; + } + + @Override + public List<TopicSink> getTopicSinks(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSink> sinks = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSink uebSink = this.getUebTopicSink(topic); + if (uebSink != null) { + sinks.add(uebSink); + } + } catch (final Exception e) { + logger.debug("No UEB sink for topic: {}", topic, e); + } + + try { + final TopicSink dmaapSink = this.getDmaapTopicSink(topic); + if (dmaapSink != null) { + sinks.add(dmaapSink); + } + } catch (final Exception e) { + logger.debug("No DMAAP sink for topic: {}", topic, e); + } + + try { + final TopicSink noopSink = this.getNoopTopicSink(topic); + if (noopSink != null) { + sinks.add(noopSink); + } + } catch (final Exception e) { + logger.debug("No NOOP sink for topic: {}", topic, e); + } + } + return sinks; + } + + @Override + public List<TopicSink> getTopicSinks(String topicName) { + if (topicName == null) { + throw parmException(null); + } + + final List<TopicSink> sinks = new ArrayList<>(); + + try { + sinks.add(this.getUebTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getDmaapTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getNoopTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + return sinks; + } + + @JsonIgnore + @Override + public List<UebTopicSource> getUebTopicSources() { + return UebTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSource> getDmaapTopicSources() { + return DmaapTopicSource.factory.inventory(); + } + + @Override + public List<NoopTopicSource> getNoopTopicSources() { + return NoopTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<UebTopicSink> getUebTopicSinks() { + return UebTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSink> getDmaapTopicSinks() { + return DmaapTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<NoopTopicSink> getNoopTopicSinks() { + return NoopTopicSink.factory.inventory(); + } + + @Override + public boolean start() { + + synchronized (this) { + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (this.alive) { + return true; + } + + this.alive = true; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.start() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem starting endpoint: {}", endpoint, e); + } + } + + return success; + } + + @Override + public boolean stop() { + + /* + * stop regardless if it is locked, in other words, stop operation has precedence over + * locks. + */ + synchronized (this) { + this.alive = false; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.stop() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem stopping endpoint: {}", endpoint, e); + } + } + + return success; + } + + /** + * Gets the endpoints. + * + * @return list of managed endpoints + */ + @JsonIgnore + protected List<Startable> getEndpoints() { + final List<Startable> endpoints = new ArrayList<>(); + + endpoints.addAll(this.getTopicSources()); + endpoints.addAll(this.getTopicSinks()); + + return endpoints; + } + + @Override + public void shutdown() { + this.stop(); + + UebTopicSource.factory.destroy(); + UebTopicSink.factory.destroy(); + + DmaapTopicSource.factory.destroy(); + DmaapTopicSink.factory.destroy(); + + NoopTopicSink.factory.destroy(); + NoopTopicSource.factory.destroy(); + + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public boolean lock() { + + synchronized (this) { + if (this.locked) { + return true; + } + + this.locked = true; + } + + for (final TopicSource source : this.getTopicSources()) { + source.lock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.lock(); + } + + return true; + } + + @Override + public boolean unlock() { + synchronized (this) { + if (!this.locked) { + return true; + } + + this.locked = false; + } + + for (final TopicSource source : this.getTopicSources()) { + source.unlock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.unlock(); + } + + return true; + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @Override + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { + + if (commType == null) { + throw parmException(topicName); + } + + if (topicName == null) { + throw parmException(null); + } + + switch (commType) { + case UEB: + return this.getUebTopicSource(topicName); + case DMAAP: + return this.getDmaapTopicSource(topicName); + case NOOP: + return this.getNoopTopicSource(topicName); + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { + if (commType == null) { + throw parmException(topicName); + } + + if (topicName == null) { + throw parmException(null); + } + + switch (commType) { + case UEB: + return this.getUebTopicSink(topicName); + case DMAAP: + return this.getDmaapTopicSink(topicName); + case NOOP: + return this.getNoopTopicSink(topicName); + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public UebTopicSource getUebTopicSource(String topicName) { + return UebTopicSource.factory.get(topicName); + } + + @Override + public UebTopicSink getUebTopicSink(String topicName) { + return UebTopicSink.factory.get(topicName); + } + + @Override + public DmaapTopicSource getDmaapTopicSource(String topicName) { + return DmaapTopicSource.factory.get(topicName); + } + + @Override + public NoopTopicSource getNoopTopicSource(String topicName) { + return NoopTopicSource.factory.get(topicName); + } + + @Override + public DmaapTopicSink getDmaapTopicSink(String topicName) { + return DmaapTopicSink.factory.get(topicName); + } + + @Override + public NoopTopicSink getNoopTopicSink(String topicName) { + return NoopTopicSink.factory.get(topicName); + } + + private IllegalArgumentException parmException(String topicName) { + return new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + private void logNoSink(String topicName, Exception ex) { + logger.debug("No sink for topic: {}", topicName, ex); + } + +}
\ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index 7e666417..4c36a39a 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -398,7 +398,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { if (dmaapTopicWriters.containsKey(topic)) { return dmaapTopicWriters.get(topic); } else { - throw new IllegalArgumentException("DmaapTopicSink for " + topic + " not found"); + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java index f45164f8..ae6c6c3b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -448,7 +448,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { if (dmaapTopicSources.containsKey(topic)) { return dmaapTopicSources.get(topic); } else { - throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found"); + throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found"); } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java new file mode 100644 index 00000000..091e46bf --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java @@ -0,0 +1,144 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.bus.internal.TopicBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * No Operation topic endpoint. + */ +public abstract class NoopTopicEndpoint extends TopicBase { + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(NoopTopicEndpoint.class); + + /** + * Network logger. + */ + private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); + + /** + * {@inheritDoc}. + */ + public NoopTopicEndpoint(List<String> servers, String topic) { + super(servers, topic); + } + + /** + * I/O. + * + * @param message message. + * @return true if sucessful. + */ + protected boolean io(String message) { + + if (message == null || message.isEmpty()) { + throw new IllegalArgumentException("Message is empty"); + } + + if (!this.alive) { + throw new IllegalStateException(this + " is stopped"); + } + + try { + synchronized (this) { + this.recentEvents.add(message); + } + + netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), + message); + + broadcast(message); + } catch (Exception e) { + logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); + return false; + } + + return true; + } + + /** + * {@inheritDoc}. + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return CommInfrastructure.NOOP; + } + + /** + * {@inheritDoc}. + */ + @Override + public boolean start() { + logger.info("{}: starting", this); + + synchronized (this) { + + if (this.alive) { + return true; + } + + if (locked) { + throw new IllegalStateException(this + " is locked."); + } + + this.alive = true; + } + + return true; + } + + /** + * {@inheritDoc}. + */ + @Override + public boolean stop() { + logger.info("{}: stopping", this); + + synchronized (this) { + this.alive = false; + } + return true; + } + + /** + * {@inheritDoc}. + */ + @Override + public void shutdown() { + logger.info("{}: shutdown", this); + + this.stop(); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "NoopTopicEndpoint[" + super.toString() + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java new file mode 100644 index 00000000..98b6ed6f --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java @@ -0,0 +1,112 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +/** + * Noop Topic Factory. + */ +public abstract class NoopTopicFactory<T extends NoopTopicEndpoint> extends TopicBaseHashedFactory<T> { + + /** + * Get Topics Property Name. + * + * @return property name. + */ + protected abstract String getTopicsPropertyName(); + + /** + * {@inheritDoc}. + */ + @Override + protected List<String> getTopicNames(Properties properties) { + String topics = properties.getProperty(getTopicsPropertyName()); + if (topics == null || topics.isEmpty()) { + return new ArrayList<>(); + } + + return Arrays.asList(topics.split("\\s*,\\s*")); + } + + /** + * {@inheritDoc}. + */ + @Override + protected List<String> getServers(String topicName, Properties properties) { + String servers = + properties.getProperty(getTopicsPropertyName() + "." + topicName + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + servers = CommInfrastructure.NOOP.toString(); + } + + return new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + } + + /** + * {@inheritDoc}. + */ + @Override + protected boolean isManaged(String topicName, Properties properties) { + String managedString = + properties.getProperty(getTopicsPropertyName() + + "." + topicName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + return managed; + } + + /** + * {@inheritDoc}. + */ + @Override + public T build(List<String> serverList, String topic, boolean managed) { + List<String> servers; + if (serverList == null || serverList.isEmpty()) { + servers = Collections.singletonList(CommInfrastructure.NOOP.toString()); + } else { + servers = serverList; + } + + return super.build(servers, topic, managed); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "NoopTopicFactory[ " + super.toString() + " ]"; + } +} + diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java index 5a5f8fbc..f6ad433d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java @@ -1,8 +1,8 @@ -/*- +/* * ============LICENSE_START======================================================= - * policy-endpoints + * ONAP * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,112 +21,39 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.List; - import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.internal.TopicBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * NOOP topic sink. + * No Operation Topic Sink. */ -public class NoopTopicSink extends TopicBase implements TopicSink { +public class NoopTopicSink extends NoopTopicEndpoint implements TopicSink { /** - * factory. + * Factory. */ - public static final NoopTopicSinkFactory factory = new IndexedNoopTopicSinkFactory(); + public static final NoopTopicSinkFactory factory = new NoopTopicSinkFactory(); /** - * logger. - */ - private static Logger logger = LoggerFactory.getLogger(NoopTopicSink.class); - - /** - * net logger. - */ - private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); - - /** - * constructor. - * - * @param servers servers - * @param topic topic - * @throws IllegalArgumentException if an invalid argument has been passed in + * {@inheritDoc}. */ public NoopTopicSink(List<String> servers, String topic) { super(servers, topic); } + /** + * {@inheritDoc}. + */ @Override public boolean send(String message) { - - if (message == null || message.isEmpty()) { - throw new IllegalArgumentException("Message to send is empty"); - } - - if (!this.alive) { - throw new IllegalStateException(this + " is stopped"); - } - - try { - synchronized (this) { - this.recentEvents.add(message); - } - - netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), - message); - - broadcast(message); - } catch (Exception e) { - logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); - return false; - } - - return true; - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return CommInfrastructure.NOOP; - } - - @Override - public boolean start() { - logger.info("{}: starting", this); - - synchronized (this) { - - if (this.alive) { - return true; - } - - if (locked) { - throw new IllegalStateException(this + " is locked."); - } - - this.alive = true; - } - - return true; - } - - @Override - public boolean stop() { - synchronized (this) { - this.alive = false; - } - return true; - } - - @Override - public void shutdown() { - this.stop(); + return super.io(message); } + /** + * {@inheritDoc}. + */ @Override public String toString() { - return "NoopTopicSink [toString()=" + super.toString() + "]"; + return "NoopTopicSink[" + super.toString() + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java index b2c50184..0c38d196 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java @@ -1,8 +1,8 @@ /* * ============LICENSE_START======================================================= - * policy-endpoints + * ONAP * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,209 +20,37 @@ package org.onap.policy.common.endpoints.event.comm.bus; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Properties; - import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Noop Topic Sink Factory. */ -public interface NoopTopicSinkFactory { - - /** - * Creates noop topic sinks based on properties files. - * - * @param properties Properties containing initialization values - * - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - List<NoopTopicSink> build(Properties properties); - - /** - * builds a noop sink. - * - * @param servers list of servers - * @param topic topic name - * @param managed is this sink endpoint managed? - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - NoopTopicSink build(List<String> servers, String topic, boolean managed); - - /** - * Destroys a sink based on the topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all sinks. - */ - void destroy(); - - /** - * gets a sink based on topic name. - * - * @param topic the topic name - * - * @return a sink with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the sink is in an incorrect state - */ - NoopTopicSink get(String topic); +public class NoopTopicSinkFactory extends NoopTopicFactory<NoopTopicSink> { /** - * Provides a snapshot of the UEB Topic Writers. - * - * @return a list of the UEB Topic Writers + * {@inheritDoc}. */ - List<NoopTopicSink> inventory(); - -} - - -/* ------------- implementation ----------------- */ - -/** - * Factory of noop sinks. - */ -class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedNoopTopicSinkFactory.class); - - /** - * noop topic sinks map. - */ - protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>(); - @Override - public List<NoopTopicSink> build(Properties properties) { - - final String sinkTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS); - if (sinkTopics == null || sinkTopics.isEmpty()) { - logger.info("{}: no topic for noop sink", this); - return new ArrayList<>(); - } - - final List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); - final List<NoopTopicSink> newSinks = new ArrayList<>(); - synchronized (this) { - for (final String topic : sinkTopicList) { - if (this.noopTopicSinks.containsKey(topic)) { - newSinks.add(this.noopTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) { - servers = "noop"; - } - - final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - final NoopTopicSink noopSink = this.build(serverList, topic, managed); - newSinks.add(noopSink); - } - return newSinks; - } - } - - @Override - public NoopTopicSink build(List<String> servers, String topic, boolean managed) { - - List<String> noopSinkServers = servers; - if (noopSinkServers == null || noopSinkServers.isEmpty()) { - noopSinkServers = Arrays.asList("noop"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (this.noopTopicSinks.containsKey(topic)) { - return this.noopTopicSinks.get(topic); - } - - final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic); - - if (managed) { - this.noopTopicSinks.put(topic, sink); - } - - return sink; - } + protected String getTopicsPropertyName() { + return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS; } + /** + * {@inheritDoc}. + */ @Override - public void destroy(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - NoopTopicSink noopSink; - synchronized (this) { - if (!this.noopTopicSinks.containsKey(topic)) { - return; - } - - noopSink = this.noopTopicSinks.remove(topic); - } - - noopSink.shutdown(); + protected NoopTopicSink build(List<String> servers, String topic) { + return new NoopTopicSink(servers, topic); } + /** + * {@inheritDoc}. + */ @Override - public void destroy() { - final List<NoopTopicSink> sinks = this.inventory(); - for (final NoopTopicSink sink : sinks) { - sink.shutdown(); - } - - synchronized (this) { - this.noopTopicSinks.clear(); - } - } - - @Override - public NoopTopicSink get(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (this.noopTopicSinks.containsKey(topic)) { - return this.noopTopicSinks.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } + public String toString() { + return "NoopTopicSinkFactory [" + super.toString() + "]"; } - @Override - public List<NoopTopicSink> inventory() { - return new ArrayList<>(this.noopTopicSinks.values()); - } } + diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java new file mode 100644 index 00000000..c3215e04 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.TopicSource; + +/** + * No Operation Topic Source. + */ +public class NoopTopicSource extends NoopTopicEndpoint implements TopicSource { + + /** + * Factory. + */ + public static final NoopTopicSourceFactory factory = new NoopTopicSourceFactory(); + + /** + * {@inheritDoc}. + */ + public NoopTopicSource(List<String> servers, String topic) { + super(servers, topic); + } + + /** + * {@inheritDoc}. + */ + @Override + public boolean offer(String event) { + return super.io(event); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "NoopTopicSource[" + super.toString() + "]"; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java new file mode 100644 index 00000000..9623b4fa --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +/** + * No Operation Topic Source Factory. + */ +public class NoopTopicSourceFactory extends NoopTopicFactory<NoopTopicSource> { + + /** + * {@inheritDoc}. + */ + @Override + protected String getTopicsPropertyName() { + return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS; + } + + /** + * {@inheritDoc}. + */ + @Override + protected NoopTopicSource build(List<String> servers, String topic) { + return new NoopTopicSource(servers, topic); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "NoopTopicSourceFactory[" + super.toString() + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java new file mode 100644 index 00000000..897a8e19 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.Topic; + +/** + * Topic Base Factory. + * + * @param <T> Type. + */ +public interface TopicBaseFactory<T extends Topic> { + + /** + * build a TopicBase instance. + * + * @param properties properties. + * @return T instance. + */ + List<T> build(Properties properties); + + /** + * build a TopicBase instance. + * + * @param servers servers. + * @param topic topic. + * @param managed managed. + * @return T instance. + */ + T build(List<String> servers, String topic, boolean managed); + + /** + * destroy TopicBase instance. + * @param topic topic. + */ + void destroy(String topic); + + /** + * destroy. + */ + void destroy(); + + /** + * get T instance. + * + * @param topic topic. + * @return T instance. + */ + T get(String topic); + + /** + * inventory of T instances. + * + * @return T instance list. + */ + List<T> inventory(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java new file mode 100644 index 00000000..f958bd01 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java @@ -0,0 +1,197 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.Topic; + +/** + * Topic Factory implementation that indexes T instances in a hash table. + */ +public abstract class TopicBaseHashedFactory<T extends Topic> implements TopicBaseFactory<T> { + + protected static final String MISSING_TOPIC_MESSAGE = "A topic must be provided"; + protected static final String MISSING_SERVERS_MESSAGE = "Servers must be provided"; + + /** + * endpoints. + */ + protected final HashMap<String, T> endpoints = new HashMap<>(); + + /** + * get the topic names. + * + * @param properties properties. + * @return list of topic names. + */ + protected abstract List<String> getTopicNames(Properties properties); + + /** + * get the servers that this topic uses. + * + * @param topicName name. + * @param properties properties. + * @return list of servers. + */ + protected abstract List<String> getServers(String topicName, Properties properties); + + /** + * is this topic managed? + * + * @param topicName name. + * @param properties properties. + * @return if managed. + */ + protected abstract boolean isManaged(String topicName, Properties properties); + + /** + * construct an instance of an endpoint. + * + * @param servers servers, + * @param topic topic. + * @return an instance of T. + */ + protected abstract T build(List<String> servers, String topic); + + /** + * {@inheritDoc}. + */ + @Override + public List<T> build(Properties properties) { + List<String> topicNames = getTopicNames(properties); + if (topicNames == null || topicNames.isEmpty()) { + return Collections.emptyList(); + } + + List<T> newEndpoints = new ArrayList<>(); + synchronized (this) { + for (String name : topicNames) { + if (this.endpoints.containsKey(name)) { + newEndpoints.add(this.endpoints.get(name)); + continue; + } + + newEndpoints.add(this.build(getServers(name, properties), name, isManaged(name, properties))); + } + } + return newEndpoints; + } + + /** + * {@inheritDoc}. + */ + @Override + public T build(List<String> servers, String topic, boolean managed) { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException(MISSING_SERVERS_MESSAGE); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC_MESSAGE); + } + + synchronized (this) { + if (this.endpoints.containsKey(topic)) { + return this.endpoints.get(topic); + } + + T endpoint = build(servers, topic); + if (managed) { + this.endpoints.put(topic, endpoint); + } + + return endpoint; + } + } + + /** + * {@inheritDoc}. + */ + @Override + public void destroy(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC_MESSAGE); + } + + T endpoint; + synchronized (this) { + if (!this.endpoints.containsKey(topic)) { + return; + } + + endpoint = this.endpoints.remove(topic); + } + endpoint.shutdown(); + } + + /** + * {@inheritDoc}. + */ + @Override + public void destroy() { + final List<T> snapshotEndpoints = this.inventory(); + for (final T snapshot : snapshotEndpoints) { + snapshot.shutdown(); + } + + synchronized (this) { + this.endpoints.clear(); + } + } + + /** + * {@inheritDoc}. + */ + @Override + public T get(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC_MESSAGE); + } + + synchronized (this) { + if (this.endpoints.containsKey(topic)) { + return this.endpoints.get(topic); + } else { + throw new IllegalStateException(topic + " not found"); + } + } + } + + /** + * {@inheritDoc}. + */ + @Override + public List<T> inventory() { + return new ArrayList<>(this.endpoints.values()); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "TopicBaseHashedFactory[ " + super.toString() + " ]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java index cc71748d..883ba7d2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java @@ -1,8 +1,8 @@ /*- * ============LICENSE_START======================================================= - * policy-core + * ONAP * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,6 +71,7 @@ public interface PolicyEndPointProperties { String PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX = ".dme2.sessionStickinessRequired"; + String PROPERTY_NOOP_SOURCE_TOPICS = "noop.source.topics"; String PROPERTY_NOOP_SINK_TOPICS = "noop.sink.topics"; /* HTTP Server Properties */ @@ -98,5 +99,4 @@ public interface PolicyEndPointProperties { String PROPERTY_HTTP_CLIENT_SERVICES = "http.client.services"; String PROPERTY_HTTP_URL_SUFFIX = PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX; - } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java new file mode 100644 index 00000000..19dde432 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java @@ -0,0 +1,358 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.Properties; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicPropertyBuilder; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicPropertyBuilder; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +public class TopicEndpointProxyTest { + + private static final String NOOP_SOURCE_TOPIC = "noop-source"; + private static final String NOOP_SINK_TOPIC = "noop-sink"; + + private static final String UEB_SOURCE_TOPIC = "ueb-source"; + private static final String UEB_SINK_TOPIC = "ueb-sink"; + + private static final String DMAAP_SOURCE_TOPIC = "dmaap-source"; + private static final String DMAAP_SINK_TOPIC = "dmaap-sink"; + + private Properties configuration = new Properties(); + + /** + * Constructor. + */ + public TopicEndpointProxyTest() { + Properties noopSourceProperties = + new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS) + .makeTopic(NOOP_SOURCE_TOPIC).build(); + + Properties noopSinkProperties = + new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS) + .makeTopic(NOOP_SINK_TOPIC).build(); + + Properties uebSourceProperties = + new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS) + .makeTopic(UEB_SOURCE_TOPIC).build(); + + Properties uebSinkProperties = + new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS) + .makeTopic(UEB_SINK_TOPIC).build(); + + Properties dmaapSourceProperties = + new DmaapTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS) + .makeTopic(DMAAP_SOURCE_TOPIC).build(); + + Properties dmaapSinkProperties = + new DmaapTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS) + .makeTopic(DMAAP_SINK_TOPIC).build(); + + configuration.putAll(noopSourceProperties); + configuration.putAll(noopSinkProperties); + configuration.putAll(uebSourceProperties); + configuration.putAll(uebSinkProperties); + configuration.putAll(dmaapSourceProperties); + configuration.putAll(dmaapSinkProperties); + } + + private <T extends Topic> boolean exists(List<T> topics, String topicName) { + return topics.stream().map(Topic::getTopic).anyMatch(topicName::equals); + } + + private <T extends Topic> boolean allSources(List<T> topics) { + return exists(topics, NOOP_SOURCE_TOPIC) + && exists(topics, UEB_SOURCE_TOPIC) + && exists(topics, DMAAP_SOURCE_TOPIC); + } + + private <T extends Topic> boolean allSinks(List<T> topics) { + return exists(topics, NOOP_SINK_TOPIC) + && exists(topics, UEB_SINK_TOPIC) + && exists(topics, DMAAP_SINK_TOPIC); + } + + private <T extends Topic> boolean anySource(List<T> topics) { + return exists(topics, NOOP_SOURCE_TOPIC) + || exists(topics, UEB_SOURCE_TOPIC) + || exists(topics, DMAAP_SOURCE_TOPIC); + } + + private <T extends Topic> boolean anySink(List<T> topics) { + return exists(topics, NOOP_SINK_TOPIC) + || exists(topics, UEB_SINK_TOPIC) + || exists(topics, DMAAP_SINK_TOPIC); + } + + @Test + public void addTopicSources() { + TopicEndpoint manager = new TopicEndpointProxy(); + + List<TopicSource> sources = manager.addTopicSources(configuration); + assertSame(3, sources.size()); + + assertTrue(allSources(sources)); + assertFalse(anySink(sources)); + } + + @Test + public void addTopicSinks() { + TopicEndpoint manager = new TopicEndpointProxy(); + + List<TopicSink> sinks = manager.addTopicSinks(configuration); + assertSame(3, sinks.size()); + + assertFalse(anySource(sinks)); + assertTrue(allSinks(sinks)); + } + + @Test + public void getTopicSources() { + TopicEndpoint manager = new TopicEndpointProxy(); + + manager.addTopicSources(configuration); + manager.addTopicSinks(configuration); + + List<TopicSource> sources = manager.getTopicSources(); + assertSame(3, sources.size()); + + assertTrue(allSources(sources)); + assertFalse(anySink(sources)); + } + + @Test + public void getTopicSinks() { + TopicEndpoint manager = new TopicEndpointProxy(); + + manager.addTopicSources(configuration); + manager.addTopicSinks(configuration); + + List<TopicSink> sinks = manager.getTopicSinks(); + assertSame(3, sinks.size()); + + assertFalse(anySource(sinks)); + assertTrue(allSinks(sinks)); + } + + @Test + public void getUebTopicSources() { + TopicEndpoint manager = new TopicEndpointProxy(); + + manager.addTopicSources(configuration); + assertSame(1, manager.getUebTopicSources().size()); + } + + @Test + public void getDmaapTopicSources() { + TopicEndpoint manager = new TopicEndpointProxy(); + + manager.addTopicSources(configuration); + assertSame(1, manager.getDmaapTopicSources().size()); + } + + @Test + public void getNoopTopicSources() { + TopicEndpoint manager = new TopicEndpointProxy(); + + manager.addTopicSources(configuration); + assertSame(1, manager.getNoopTopicSources().size()); + } + + @Test + public void getUebTopicSinks() { + TopicEndpoint manager = new TopicEndpointProxy(); + + manager.addTopicSinks(configuration); + assertSame(1, manager.getUebTopicSinks().size()); + } + + @Test + public void getDmaapTopicSinks() { + TopicEndpoint manager = new TopicEndpointProxy(); + + manager.addTopicSinks(configuration); + assertSame(1, manager.getDmaapTopicSinks().size()); + } + + @Test + public void getNoopTopicSinks() { + TopicEndpoint manager = new TopicEndpointProxy(); + + manager.addTopicSinks(configuration); + assertSame(1, manager.getNoopTopicSinks().size()); + } + + @Test + public void lifecycle() { + TopicEndpoint manager = new TopicEndpointProxy(); + + assertTrue(manager.start()); + assertTrue(manager.isAlive()); + + assertTrue(manager.stop()); + assertFalse(manager.isAlive()); + + assertTrue(manager.start()); + assertTrue(manager.isAlive()); + + manager.shutdown(); + assertFalse(manager.isAlive()); + } + + @Test + public void lock() { + TopicEndpoint manager = new TopicEndpointProxy(); + + manager.lock(); + assertTrue(manager.isLocked()); + + manager.unlock(); + assertFalse(manager.isLocked()); + } + + @Test + public void getTopicSource() { + TopicEndpoint manager = new TopicEndpointProxy(); + manager.addTopicSources(configuration); + + assertSame(NOOP_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC).getTopic()); + assertSame(UEB_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.UEB, UEB_SOURCE_TOPIC).getTopic()); + assertSame(DMAAP_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.DMAAP, DMAAP_SOURCE_TOPIC).getTopic()); + + assertThatIllegalStateException() + .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SINK_TOPIC)); + assertThatIllegalStateException() + .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.UEB, UEB_SINK_TOPIC)); + assertThatIllegalStateException() + .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.DMAAP, DMAAP_SINK_TOPIC)); + } + + @Test + public void getTopicSink() { + TopicEndpoint manager = new TopicEndpointProxy(); + manager.addTopicSinks(configuration); + + assertSame(NOOP_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SINK_TOPIC).getTopic()); + assertSame(UEB_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.UEB, UEB_SINK_TOPIC).getTopic()); + assertSame(DMAAP_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.DMAAP, DMAAP_SINK_TOPIC).getTopic()); + + assertThatIllegalStateException() + .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC)); + assertThatIllegalStateException() + .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.UEB, UEB_SOURCE_TOPIC)); + assertThatIllegalStateException() + .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.DMAAP, DMAAP_SOURCE_TOPIC)); + } + + @Test + public void getUebTopicSource() { + TopicEndpoint manager = new TopicEndpointProxy(); + manager.addTopicSources(configuration); + + assertSame(UEB_SOURCE_TOPIC, manager.getUebTopicSource(UEB_SOURCE_TOPIC).getTopic()); + + assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSource(NOOP_SOURCE_TOPIC)); + assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSource(DMAAP_SOURCE_TOPIC)); + + assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSource(null)); + assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSource("")); + } + + @Test + public void getUebTopicSink() { + TopicEndpoint manager = new TopicEndpointProxy(); + manager.addTopicSinks(configuration); + + assertSame(UEB_SINK_TOPIC, manager.getUebTopicSink(UEB_SINK_TOPIC).getTopic()); + + assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSink(NOOP_SINK_TOPIC)); + assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSink(DMAAP_SINK_TOPIC)); + + assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSink(null)); + assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSink("")); + } + + @Test + public void getDmaapTopicSource() { + TopicEndpoint manager = new TopicEndpointProxy(); + manager.addTopicSources(configuration); + + assertSame(DMAAP_SOURCE_TOPIC, manager.getDmaapTopicSource(DMAAP_SOURCE_TOPIC).getTopic()); + + assertThatIllegalStateException().isThrownBy(() -> manager.getDmaapTopicSource(NOOP_SOURCE_TOPIC)); + assertThatIllegalStateException().isThrownBy(() -> manager.getDmaapTopicSource(UEB_SOURCE_TOPIC)); + + assertThatIllegalArgumentException().isThrownBy(() -> manager.getDmaapTopicSource(null)); + assertThatIllegalArgumentException().isThrownBy(() -> manager.getDmaapTopicSource("")); + } + + @Test + public void getDmaapTopicSink() { + TopicEndpoint manager = new TopicEndpointProxy(); + manager.addTopicSinks(configuration); + + assertSame(DMAAP_SINK_TOPIC, manager.getDmaapTopicSink(DMAAP_SINK_TOPIC).getTopic()); + + assertThatIllegalStateException().isThrownBy(() -> manager.getDmaapTopicSink(NOOP_SINK_TOPIC)); + assertThatIllegalStateException().isThrownBy(() -> manager.getDmaapTopicSink(UEB_SINK_TOPIC)); + + assertThatIllegalArgumentException().isThrownBy(() -> manager.getDmaapTopicSink(null)); + assertThatIllegalArgumentException().isThrownBy(() -> manager.getDmaapTopicSink("")); + } + + + @Test + public void getNoopTopicSource() { + TopicEndpoint manager = new TopicEndpointProxy(); + manager.addTopicSources(configuration); + + assertSame(NOOP_SOURCE_TOPIC, manager.getNoopTopicSource(NOOP_SOURCE_TOPIC).getTopic()); + + assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSource(DMAAP_SOURCE_TOPIC)); + assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSource(UEB_SOURCE_TOPIC)); + + assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(null)); + assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource("")); + } + + @Test + public void getNoopTopicSink() { + TopicEndpoint manager = new TopicEndpointProxy(); + manager.addTopicSinks(configuration); + + assertSame(NOOP_SINK_TOPIC, manager.getNoopTopicSink(NOOP_SINK_TOPIC).getTopic()); + + assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSink(DMAAP_SINK_TOPIC)); + assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSink(UEB_SINK_TOPIC)); + + assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(null)); + assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink("")); + } +}
\ No newline at end of file diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactoryTestBase.java index 877246e2..67304f91 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactoryTestBase.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactoryTestBase.java @@ -21,6 +21,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.junit.Assert.assertEquals; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX; @@ -126,6 +127,6 @@ public abstract class DmaapTopicFactoryTestBase<T extends Topic> extends BusTopi initFactory(); buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()); - assertThatIllegalArgumentException().as("unknown topic").isThrownBy(() -> getTopic(TOPIC2)); + assertThatIllegalStateException().as("unknown topic").isThrownBy(() -> getTopic(TOPIC2)); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpointTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpointTest.java new file mode 100644 index 00000000..5bbc80cd --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpointTest.java @@ -0,0 +1,117 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.Collections; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicListener; + +public abstract class NoopTopicEndpointTest<F extends NoopTopicFactory<T>, T extends NoopTopicEndpoint> + extends TopicTestBase { + + protected final F factory; + protected T endpoint; + + protected abstract boolean io(String message); + + public NoopTopicEndpointTest(F factory) { + this.factory = factory; + } + + @Before + public void setUp() { + super.setUp(); + this.endpoint = this.factory.build(servers, MY_TOPIC); + } + + @Test + public void tesIo() { + TopicListener listener = mock(TopicListener.class); + this.endpoint.register(listener); + this.endpoint.start(); + + assertTrue(io(MY_MESSAGE)); + assertSame(MY_MESSAGE, this.endpoint.getRecentEvents()[0]); + assertEquals(Collections.singletonList(MY_MESSAGE), Arrays.asList(this.endpoint.getRecentEvents())); + verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE); + + this.endpoint.unregister(listener); + } + + @Test(expected = IllegalArgumentException.class) + public void testIoNullMessage() { + io(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testIoEmptyMessage() { + io(""); + } + + @Test(expected = IllegalStateException.class) + public void testOfferNotStarted() { + io(MY_MESSAGE); + } + + @Test + public void testGetTopicCommInfrastructure() { + assertEquals(CommInfrastructure.NOOP, this.endpoint.getTopicCommInfrastructure()); + } + + @Test + public void testStart_testStop_testShutdown() { + this.endpoint.start(); + assertTrue(this.endpoint.isAlive()); + + // start again + this.endpoint.start(); + assertTrue(this.endpoint.isAlive()); + + // stop + this.endpoint.stop(); + assertFalse(this.endpoint.isAlive()); + + // re-start again + this.endpoint.start(); + assertTrue(this.endpoint.isAlive()); + + // shutdown + this.endpoint.shutdown(); + assertFalse(this.endpoint.isAlive()); + } + + @Test(expected = IllegalStateException.class) + public void testStart_Locked() { + this.endpoint.lock(); + this.endpoint.start(); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactoryTest.java new file mode 100644 index 00000000..16d9e539 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactoryTest.java @@ -0,0 +1,224 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; + +public abstract class NoopTopicFactoryTest<F extends NoopTopicFactory<T>, T extends NoopTopicEndpoint> + extends TopicFactoryTestBase<T> { + + private static final List<String> NOOP_SERVERS = Arrays.asList(CommInfrastructure.NOOP.toString()); + private F factory = null; + + protected abstract F buildFactory(); + + /** + * Creates the object to be tested. + */ + @Before + public void setUp() { + super.setUp(); + initFactory(); + } + + @After + public void tearDown() { + factory.destroy(); + } + + @Test + public void testBuildListOfStringStringBoolean() { + initFactory(); + + T item1 = buildTopic(servers, MY_TOPIC, true); + assertNotNull(item1); + + assertEquals(servers, item1.getServers()); + assertEquals(MY_TOPIC, item1.getTopic()); + + // managed topic - should not build a new one + assertEquals(item1, buildTopic(servers, MY_TOPIC, true)); + + T item2 = buildTopic(servers, TOPIC2, true); + assertNotNull(item2); + assertNotSame(item1, item2); + + // duplicate - should be the same, as these topics are managed + List<String> randomServers = new ArrayList<>(); + randomServers.add(RandomStringUtils.randomAlphanumeric(8)); + T item3 = buildTopic(randomServers, TOPIC2, true); + assertSame(item2, item3); + + T item4 = buildTopic(Collections.emptyList(), TOPIC2, true); + assertSame(item3, item4); + + // null server list + initFactory(); + assertEquals(NOOP_SERVERS, buildTopic(null, MY_TOPIC, true).getServers()); + + // empty server list + initFactory(); + assertEquals(NOOP_SERVERS, buildTopic(Collections.emptyList(), MY_TOPIC, true).getServers()); + + // unmanaged topic + initFactory(); + item1 = buildTopic(servers, MY_TOPIC, false); + assertNotSame(item1, buildTopic(servers, MY_TOPIC, false)); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuildListOfStringStringBoolean_NullTopic() { + buildTopic(servers, null, true); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuildListOfStringStringBoolean_EmptyTopic() { + buildTopic(servers, "", true); + } + + @Test + public void testBuildProperties() { + // managed topic + initFactory(); + assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size()); + assertNotNull(factory.get(MY_TOPIC)); + + // unmanaged topic - get() will throw an exception + initFactory(); + assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC) + .setTopicProperty(PROPERTY_MANAGED_SUFFIX, "false").build()).size()); + assertThatThrownBy(() -> factory.get(MY_TOPIC)); + + // managed undefined - default to true + initFactory(); + assertEquals(1, buildTopics( + makePropBuilder().makeTopic(MY_TOPIC).removeTopicProperty(PROPERTY_MANAGED_SUFFIX).build()) + .size()); + assertNotNull(factory.get(MY_TOPIC)); + + // managed empty - default to true + initFactory(); + assertEquals(1, buildTopics( + makePropBuilder().makeTopic(MY_TOPIC).setTopicProperty(PROPERTY_MANAGED_SUFFIX, "").build()) + .size()); + assertNotNull(factory.get(MY_TOPIC)); + + initFactory(); + + // null topic list + assertTrue(buildTopics(makePropBuilder().build()).isEmpty()); + + // empty topic list + assertTrue(buildTopics(makePropBuilder().addTopic("").build()).isEmpty()); + + // null server list + initFactory(); + T endpoint = buildTopics(makePropBuilder().makeTopic(MY_TOPIC) + .removeTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX).build()).get(0); + assertEquals(NOOP_SERVERS, endpoint.getServers()); + + // empty server list + initFactory(); + endpoint = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, "") + .build()).get(0); + assertEquals(NOOP_SERVERS, endpoint.getServers()); + + // test other options + super.testBuildProperties_Multiple(); + } + + @Test + public void testDestroyString_testGet_testInventory() { + super.testDestroyString_testGet_testInventory(); + super.testDestroyString_Ex(); + } + + @Test + public void testDestroy() { + super.testDestroy(); + } + + @Test + public void testGet() { + super.testGet_Ex(); + } + + @Override + protected void initFactory() { + if (factory != null) { + factory.destroy(); + } + + factory = buildFactory(); + } + + @Override + protected List<T> buildTopics(Properties properties) { + return factory.build(properties); + } + + protected T buildTopic(List<String> servers, String topic, boolean managed) { + return factory.build(servers, topic, managed); + } + + @Override + protected void destroyFactory() { + factory.destroy(); + } + + @Override + protected void destroyTopic(String topic) { + factory.destroy(topic); + } + + @Override + protected List<T> getInventory() { + return factory.inventory(); + } + + @Override + protected T getTopic(String topic) { + return factory.get(topic); + } + + @Override + protected TopicPropertyBuilder makePropBuilder() { + return new NoopTopicPropertyBuilder(factory.getTopicsPropertyName()); + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactoryTest.java index 82a9df4c..cc44716f 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactoryTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactoryTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * policy-endpoints + * ONAP * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ @@ -20,195 +20,10 @@ package org.onap.policy.common.endpoints.event.comm.bus; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class NoopTopicSinkFactoryTest extends TopicFactoryTestBase<NoopTopicSink> { - - private static final List<String> NOOP_SERVERS = Arrays.asList("noop"); - - private IndexedNoopTopicSinkFactory factory; - - /** - * Creates the object to be tested. - */ - @Before - public void setUp() { - super.setUp(); - - factory = new IndexedNoopTopicSinkFactory(); - } - - @After - public void tearDown() { - factory.destroy(); - } - - @Test - public void testBuildListOfStringStringBoolean() { - initFactory(); - - NoopTopicSink item1 = buildTopic(servers, MY_TOPIC, true); - assertNotNull(item1); - - assertEquals(servers, item1.getServers()); - assertEquals(MY_TOPIC, item1.getTopic()); - - // managed topic - should not build a new one - assertEquals(item1, buildTopic(servers, MY_TOPIC, true)); - - NoopTopicSink item2 = buildTopic(servers, TOPIC2, true); - assertNotNull(item2); - assertTrue(item1 != item2); - - // duplicate - should be the same, as these topics are managed - NoopTopicSink item3 = buildTopic(Collections.emptyList(), TOPIC2, true); - assertTrue(item2 == item3); - - // null server list - initFactory(); - assertEquals(NOOP_SERVERS, buildTopic(null, MY_TOPIC, true).getServers()); - - // empty server list - initFactory(); - assertEquals(NOOP_SERVERS, buildTopic(Collections.emptyList(), MY_TOPIC, true).getServers()); - - // unmanaged topic - initFactory(); - item1 = buildTopic(servers, MY_TOPIC, false); - assertTrue(item1 != buildTopic(servers, MY_TOPIC, false)); - } - - @Test(expected = IllegalArgumentException.class) - public void testBuildListOfStringStringBoolean_NullTopic() { - buildTopic(servers, null, true); - } - - @Test(expected = IllegalArgumentException.class) - public void testBuildListOfStringStringBoolean_EmptyTopic() { - buildTopic(servers, "", true); - } - - @Test - public void testBuildProperties() { - // managed topic - initFactory(); - assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size()); - assertNotNull(factory.get(MY_TOPIC)); - - // unmanaged topic - get() will throw an exception - initFactory(); - assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC) - .setTopicProperty(PROPERTY_MANAGED_SUFFIX, "false").build()).size()); - assertThatThrownBy(() -> factory.get(MY_TOPIC)); - - // managed undefined - default to true - initFactory(); - assertEquals(1, buildTopics( - makePropBuilder().makeTopic(MY_TOPIC).removeTopicProperty(PROPERTY_MANAGED_SUFFIX).build()) - .size()); - assertNotNull(factory.get(MY_TOPIC)); - - // managed empty - default to true - initFactory(); - assertEquals(1, buildTopics( - makePropBuilder().makeTopic(MY_TOPIC).setTopicProperty(PROPERTY_MANAGED_SUFFIX, "").build()) - .size()); - assertNotNull(factory.get(MY_TOPIC)); - - initFactory(); - - // null topic list - assertTrue(buildTopics(makePropBuilder().build()).isEmpty()); - - // empty topic list - assertTrue(buildTopics(makePropBuilder().addTopic("").build()).isEmpty()); - - // null server list - initFactory(); - NoopTopicSink sink = buildTopics(makePropBuilder().makeTopic(MY_TOPIC) - .removeTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX).build()).get(0); - assertEquals(NOOP_SERVERS, sink.getServers()); - - // empty server list - initFactory(); - sink = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, "") - .build()).get(0); - assertEquals(NOOP_SERVERS, sink.getServers()); - - // test other options - super.testBuildProperties_Multiple(); - } - - @Test - public void testDestroyString_testGet_testInventory() { - super.testDestroyString_testGet_testInventory(); - super.testDestroyString_Ex(); - } - - @Test - public void testDestroy() { - super.testDestroy(); - } - - @Test - public void testGet() { - super.testGet_Ex(); - } - - @Override - protected void initFactory() { - if (factory != null) { - factory.destroy(); - } - - factory = new IndexedNoopTopicSinkFactory(); - } - - @Override - protected List<NoopTopicSink> buildTopics(Properties properties) { - return factory.build(properties); - } - - protected NoopTopicSink buildTopic(List<String> servers, String topic, boolean managed) { - return factory.build(servers, topic, managed); - } - - @Override - protected void destroyFactory() { - factory.destroy(); - } - - @Override - protected void destroyTopic(String topic) { - factory.destroy(topic); - } - - @Override - protected List<NoopTopicSink> getInventory() { - return factory.inventory(); - } - - @Override - protected NoopTopicSink getTopic(String topic) { - return factory.get(topic); - } +public class NoopTopicSinkFactoryTest extends NoopTopicFactoryTest<NoopTopicSinkFactory, NoopTopicSink> { @Override - protected TopicPropertyBuilder makePropBuilder() { - return new NoopTopicPropertyBuilder(PROPERTY_NOOP_SINK_TOPICS); + protected NoopTopicSinkFactory buildFactory() { + return new NoopTopicSinkFactory(); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkTest.java index 8a5b7b2e..02478367 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkTest.java @@ -1,8 +1,8 @@ /* * ============LICENSE_START======================================================= - * policy-endpoints + * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,50 +20,30 @@ package org.onap.policy.common.endpoints.event.comm.bus; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import java.util.Arrays; -import org.junit.Before; import org.junit.Test; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicListener; -public class NoopTopicSinkTest extends TopicTestBase { +public class NoopTopicSinkTest extends NoopTopicEndpointTest<NoopTopicSinkFactory, NoopTopicSink> { - private NoopTopicSink sink; - - /** - * Creates the object to be tested. - */ - @Before - public void setUp() { - super.setUp(); + public NoopTopicSinkTest() { + super(new NoopTopicSinkFactory()); + } - sink = new NoopTopicSink(servers, MY_TOPIC); + @Override + protected boolean io(String message) { + return endpoint.send(message); } @Test public void testToString() { - assertTrue(sink.toString().startsWith("NoopTopicSink [")); + assertThat(endpoint.toString()).startsWith("NoopTopicSink"); } @Test public void testSend() { - TopicListener listener = mock(TopicListener.class); - sink.register(listener); - sink.start(); - - assertTrue(sink.send(MY_MESSAGE)); - - assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(sink.getRecentEvents())); - verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE); - - // generate exception during broadcast - sink = new NoopTopicSink(servers, MY_TOPIC) { + NoopTopicSink sink = new NoopTopicSink(servers, MY_TOPIC) { @Override protected boolean broadcast(String message) { throw new RuntimeException(EXPECTED); @@ -74,53 +54,4 @@ public class NoopTopicSinkTest extends TopicTestBase { sink.start(); assertFalse(sink.send(MY_MESSAGE)); } - - @Test(expected = IllegalArgumentException.class) - public void testSend_NullMessage() { - sink.send(null); - } - - @Test(expected = IllegalArgumentException.class) - public void testSend_EmptyMessage() { - sink.send(""); - } - - @Test(expected = IllegalStateException.class) - public void testSend_NotStarted() { - sink.send(MY_MESSAGE); - } - - @Test - public void testGetTopicCommInfrastructure() { - assertEquals(CommInfrastructure.NOOP, sink.getTopicCommInfrastructure()); - } - - @Test - public void testStart_testStop_testShutdown() { - sink.start(); - assertTrue(sink.isAlive()); - - // start again - sink.start(); - assertTrue(sink.isAlive()); - - // stop - sink.stop(); - assertFalse(sink.isAlive()); - - // re-start again - sink.start(); - assertTrue(sink.isAlive()); - - // shutdown - sink.shutdown(); - assertFalse(sink.isAlive()); - } - - @Test(expected = IllegalStateException.class) - public void testStart_Locked() { - sink.lock(); - sink.start(); - } - } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactoryTest.java new file mode 100644 index 00000000..c8a44292 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactoryTest.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +public class NoopTopicSourceFactoryTest extends NoopTopicFactoryTest<NoopTopicSourceFactory, NoopTopicSource> { + + @Override + protected NoopTopicSourceFactory buildFactory() { + return new NoopTopicSourceFactory(); + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceTest.java new file mode 100644 index 00000000..22ccb9a8 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceTest.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class NoopTopicSourceTest extends NoopTopicEndpointTest<NoopTopicSourceFactory, NoopTopicSource> { + + public NoopTopicSourceTest() { + super(new NoopTopicSourceFactory()); + } + + @Override + protected boolean io(String message) { + return this.endpoint.offer(message); + } + + @Test + public void testToString() { + assertTrue(this.endpoint.toString().startsWith("NoopTopicSource")); + } + + @Test + public void testOffer() { + NoopTopicSource source = new NoopTopicSource(servers, MY_TOPIC) { + @Override + protected boolean broadcast(String message) { + throw new RuntimeException(EXPECTED); + } + + }; + + source.start(); + assertFalse(source.offer(MY_MESSAGE)); + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/http/server/test/NoopTopicTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/http/server/test/NoopTopicTest.java deleted file mode 100644 index 388524a9..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/http/server/test/NoopTopicTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.http.server.test; - -import static org.junit.Assert.assertTrue; - -import java.util.List; -import java.util.Properties; - -import org.junit.Test; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; -import org.onap.policy.common.endpoints.event.comm.TopicListener; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * NOOP Endpoint Tests. - */ -public class NoopTopicTest implements TopicListener { - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(NoopTopicTest.class); - - private final String topicName = "junit-noop"; - private final String outMessage = "blah"; - private String inMessage = null; - - @Test - public void testNoopEndpoint() { - logger.info("-- testNoopEndpoint() --"); - - Properties noopSinkProperties = new Properties(); - noopSinkProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, topicName); - - List<? extends TopicSink> noopTopics = TopicEndpoint.manager.addTopicSinks(noopSinkProperties); - - TopicSink sink = NoopTopicSink.factory.get(topicName); - - assertTrue(noopTopics.size() == 1); - assertTrue(noopTopics.size() == NoopTopicSink.factory.inventory().size()); - assertTrue(noopTopics.get(0) == sink); - assertTrue(sink == NoopTopicSink.factory.inventory().get(0)); - - assertTrue(!sink.isAlive()); - - boolean badState = false; - try { - sink.send(outMessage); - } catch (IllegalStateException e) { - badState = true; - } - assertTrue(badState); - - sink.start(); - assertTrue(sink.isAlive()); - - sink.send(outMessage); - assertTrue(sink.getRecentEvents().length == 1); - assertTrue(sink.getRecentEvents()[0].equals(outMessage)); - assertTrue(this.inMessage == null); - - sink.register(this); - sink.send(this.outMessage); - assertTrue(outMessage.equals(this.inMessage)); - this.inMessage = null; - - sink.unregister(this); - sink.send(this.outMessage); - assertTrue(!outMessage.equals(this.inMessage)); - - sink.stop(); - try { - sink.send(outMessage); - } catch (IllegalStateException e) { - badState = true; - } - assertTrue(badState); - - NoopTopicSink.factory.destroy(topicName); - assertTrue(NoopTopicSink.factory.inventory().size() == 0); - } - - @Override - public void onTopicEvent(CommInfrastructure commType, String topic, String event) { - if (commType != CommInfrastructure.NOOP) { - return; - } - - if (topic == null || !topic.equals(topicName)) { - return; - } - - this.inMessage = event; - } -} |