diff options
author | Jorge Hernandez <jorge.hernandez-herrero@att.com> | 2019-01-10 17:24:53 -0600 |
---|---|---|
committer | Jorge Hernandez <jorge.hernandez-herrero@att.com> | 2019-01-11 20:37:19 -0600 |
commit | 55f5c4dc9e130e48a25b048e1f3091b10c17e365 (patch) | |
tree | 2db4bf0af7a00a91f207137afff93e46ac8b2942 /policy-endpoints/src/main | |
parent | 2fa29d8632a6dc9fb855a732320b679d724f384f (diff) |
Adding NOOP sources support
In addition, Noop* classes have been refactored to
increase code reuse and clean some checkstyle issues.
Additional Junits have been added for existing functionality.
Change-Id: I072f9ff2f415630ac82eca949a8360249f73da86
Issue-ID: POLICY-1397
Signed-off-by: Jorge Hernandez <jorge.hernandez-herrero@att.com>
Diffstat (limited to 'policy-endpoints/src/main')
13 files changed, 1207 insertions, 739 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index f4080b29..52c1f07b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -1,8 +1,8 @@ /* * ============LICENSE_START======================================================= - * policy-endpoints + * ONAP * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,21 +20,16 @@ package org.onap.policy.common.endpoints.event.comm; -import com.fasterxml.jackson.annotation.JsonIgnore; - -import java.util.ArrayList; import java.util.List; import java.util.Properties; - import org.onap.policy.common.capabilities.Lockable; import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into @@ -45,7 +40,7 @@ public interface TopicEndpoint extends Startable, Lockable { /** * singleton for global access. */ - public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); + TopicEndpoint manager = new TopicEndpointProxy(); /** * Add Topic Sources to the communication infrastructure initialized per properties. @@ -54,7 +49,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @return a generic Topic Source * @throws IllegalArgumentException when invalid arguments are provided */ - public List<TopicSource> addTopicSources(Properties properties); + List<TopicSource> addTopicSources(Properties properties); /** * Add Topic Sinks to the communication infrastructure initialized per properties. @@ -63,7 +58,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @return a generic Topic Sink * @throws IllegalArgumentException when invalid arguments are provided */ - public List<TopicSink> addTopicSinks(Properties properties); + List<TopicSink> addTopicSinks(Properties properties); /** * Gets all Topic Sources. @@ -81,7 +76,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @throws IllegalStateException if the entity is in an invalid state * @throws IllegalArgumentException if invalid parameters are present */ - public List<TopicSource> getTopicSources(List<String> topicNames); + List<TopicSource> getTopicSources(List<String> topicNames); /** * Gets the Topic Source for the given topic name and underlying communication infrastructure @@ -96,7 +91,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @throws IllegalArgumentException if invalid parameters are present * @throws UnsupportedOperationException if the operation is not supported. */ - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); + TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); /** * Get the UEB Topic Source for the given topic name. @@ -108,7 +103,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSource getUebTopicSource(String topicName); + UebTopicSource getUebTopicSource(String topicName); /** * Get the DMAAP Topic Source for the given topic name. @@ -120,7 +115,15 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSource getDmaapTopicSource(String topicName); + DmaapTopicSource getDmaapTopicSource(String topicName); + + /** + * Get the Noop Source for the given topic name. + * + * @param topicName the topic name. + * @return the Noop Source. + */ + NoopTopicSource getNoopTopicSource(String topicName); /** * Get the Topic Sinks for the given topic name. @@ -128,7 +131,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @param topicNames the topic names * @return the Topic Sink List */ - public List<TopicSink> getTopicSinks(List<String> topicNames); + List<TopicSink> getTopicSinks(List<String> topicNames); /** * Get the Topic Sinks for the given topic name and all the underlying communication @@ -141,14 +144,14 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicWriters for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public List<TopicSink> getTopicSinks(String topicName); + List<TopicSink> getTopicSinks(String topicName); /** * Gets all Topic Sinks. * * @return the Topic Sink List */ - public List<TopicSink> getTopicSinks(); + List<TopicSink> getTopicSinks(); /** * Get the Topic Sinks for the given topic name and underlying communication infrastructure type. @@ -161,7 +164,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicWriters for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); + TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); /** * Get the UEB Topic Source for the given topic name. @@ -173,7 +176,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSink getUebTopicSink(String topicName); + UebTopicSink getUebTopicSink(String topicName); /** * Get the no-op Topic Sink for the given topic name. @@ -185,7 +188,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public NoopTopicSink getNoopTopicSink(String topicName); + NoopTopicSink getNoopTopicSink(String topicName); /** * Get the DMAAP Topic Source for the given topic name. @@ -197,469 +200,47 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSink getDmaapTopicSink(String topicName); + DmaapTopicSink getDmaapTopicSink(String topicName); /** * Gets only the UEB Topic Sources. * * @return the UEB Topic Source List */ - public List<UebTopicSource> getUebTopicSources(); + List<UebTopicSource> getUebTopicSources(); /** * Gets only the DMAAP Topic Sources. * * @return the DMAAP Topic Source List */ - public List<DmaapTopicSource> getDmaapTopicSources(); + List<DmaapTopicSource> getDmaapTopicSources(); + + /** + * Gets only the NOOP Topic Sources. + * + * @return the NOOP Topic Source List + */ + List<NoopTopicSource> getNoopTopicSources(); /** * Gets only the UEB Topic Sinks. * * @return the UEB Topic Sink List */ - public List<UebTopicSink> getUebTopicSinks(); + List<UebTopicSink> getUebTopicSinks(); /** * Gets only the DMAAP Topic Sinks. * * @return the DMAAP Topic Sink List */ - public List<DmaapTopicSink> getDmaapTopicSinks(); + List<DmaapTopicSink> getDmaapTopicSinks(); /** * Gets only the NOOP Topic Sinks. * * @return the NOOP Topic Sinks List */ - public List<NoopTopicSink> getNoopTopicSinks(); -} - - -/* - * ----------------- implementation ------------------- - */ - -/** - * This implementation of the Topic Endpoint Manager, proxies operations to appropriate - * implementations according to the communication infrastructure that are supported. - */ -class ProxyTopicEndpointManager implements TopicEndpoint { - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); - /** - * Is this element locked boolean. - */ - protected volatile boolean locked = false; - - /** - * Is this element alive boolean. - */ - protected volatile boolean alive = false; - - @Override - public List<TopicSource> addTopicSources(Properties properties) { - - // 1. Create UEB Sources - // 2. Create DMAAP Sources - - final List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.build(properties)); - sources.addAll(DmaapTopicSource.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSource source : sources) { - source.lock(); - } - } - - return sources; - } - - @Override - public List<TopicSink> addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - - final List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.build(properties)); - sinks.addAll(DmaapTopicSink.factory.build(properties)); - sinks.addAll(NoopTopicSink.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSink sink : sinks) { - sink.lock(); - } - } - - return sinks; - } - - @Override - public List<TopicSource> getTopicSources() { - - final List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.inventory()); - sources.addAll(DmaapTopicSource.factory.inventory()); - - return sources; - } - - @Override - public List<TopicSource> getTopicSources(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List<TopicSource> sources = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) { - sources.add(uebSource); - } - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } - - try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) { - sources.add(dmaapSource); - } - } catch (final Exception e) { - logger.debug("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List<TopicSink> getTopicSinks() { - - final List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.inventory()); - sinks.addAll(DmaapTopicSink.factory.inventory()); - sinks.addAll(NoopTopicSink.factory.inventory()); - - return sinks; - } - - @Override - public List<TopicSink> getTopicSinks(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List<TopicSink> sinks = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) { - sinks.add(uebSink); - } - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - - try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) { - sinks.add(dmaapSink); - } - } catch (final Exception e) { - logger.debug("No DMAAP sink for topic: {}", topic, e); - } - - try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) { - sinks.add(noopSink); - } - } catch (final Exception e) { - logger.debug("No NOOP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override - public List<TopicSink> getTopicSinks(String topicName) { - if (topicName == null) { - throw parmException(topicName); - } - - final List<TopicSink> sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getNoopTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - return sinks; - } - - @JsonIgnore - @Override - public List<UebTopicSource> getUebTopicSources() { - return UebTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSource> getDmaapTopicSources() { - return DmaapTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<UebTopicSink> getUebTopicSinks() { - return UebTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSink> getDmaapTopicSinks() { - return DmaapTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<NoopTopicSink> getNoopTopicSinks() { - return NoopTopicSink.factory.inventory(); - } - - @Override - public boolean start() { - - synchronized (this) { - if (this.locked) { - throw new IllegalStateException(this + " is locked"); - } - - if (this.alive) { - return true; - } - - this.alive = true; - } - - final List<Startable> endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.start() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem starting endpoint: {}", endpoint, e); - } - } - - return success; - } - - - @Override - public boolean stop() { - - /* - * stop regardless if it is locked, in other words, stop operation has precedence over - * locks. - */ - synchronized (this) { - this.alive = false; - } - - final List<Startable> endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.stop() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem stopping endpoint: {}", endpoint, e); - } - } - - return success; - } - - /** - * Gets the endpoints. - * - * @return list of managed endpoints - */ - @JsonIgnore - protected List<Startable> getEndpoints() { - final List<Startable> endpoints = new ArrayList<>(); - - endpoints.addAll(this.getTopicSources()); - endpoints.addAll(this.getTopicSinks()); - - return endpoints; - } - - @Override - public void shutdown() { - UebTopicSource.factory.destroy(); - UebTopicSink.factory.destroy(); - NoopTopicSink.factory.destroy(); - - DmaapTopicSource.factory.destroy(); - DmaapTopicSink.factory.destroy(); - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean lock() { - - synchronized (this) { - if (this.locked) { - return true; - } - - this.locked = true; - } - - for (final TopicSource source : this.getTopicSources()) { - source.lock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.lock(); - } - - return true; - } - - @Override - public boolean unlock() { - synchronized (this) { - if (!this.locked) { - return true; - } - - this.locked = false; - } - - for (final TopicSource source : this.getTopicSources()) { - source.unlock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.unlock(); - } - - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { - - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - private IllegalArgumentException parmException(String topicName) { - return new IllegalArgumentException( - "Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - @Override - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case NOOP: - return this.getNoopTopicSink(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - private void logNoSink(String topicName, Exception ex) { - logger.debug("No sink for topic: {}", topicName, ex); - } - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicSource.factory.get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicSink.factory.get(topicName); - } - - @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicSource.factory.get(topicName); - } - - @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicSink.factory.get(topicName); - } - - @Override - public NoopTopicSink getNoopTopicSink(String topicName) { - return NoopTopicSink.factory.get(topicName); - } - + List<NoopTopicSink> getNoopTopicSinks(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java new file mode 100644 index 00000000..9912761f --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -0,0 +1,489 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.capabilities.Startable; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation of the Topic Endpoint Manager, proxies operations to the appropriate + * implementation(s). + */ +class TopicEndpointProxy implements TopicEndpoint { + /** + * Logger. + */ + private static final Logger logger = LoggerFactory.getLogger(TopicEndpointProxy.class); + + /** + * Is this element locked boolean. + */ + private volatile boolean locked = false; + + /** + * Is this element alive boolean. + */ + private volatile boolean alive = false; + + @Override + public List<TopicSource> addTopicSources(Properties properties) { + + // 1. Create UEB Sources + // 2. Create DMAAP Sources + // 3. Create NOOP Sources + + List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.build(properties)); + sources.addAll(DmaapTopicSource.factory.build(properties)); + sources.addAll(NoopTopicSource.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSource source : sources) { + source.lock(); + } + } + + return sources; + } + + @Override + public List<TopicSink> addTopicSinks(Properties properties) { + // 1. Create UEB Sinks + // 2. Create DMAAP Sinks + // 3. Create NOOP Sinks + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.build(properties)); + sinks.addAll(DmaapTopicSink.factory.build(properties)); + sinks.addAll(NoopTopicSink.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSink sink : sinks) { + sink.lock(); + } + } + + return sinks; + } + + @Override + public List<TopicSource> getTopicSources() { + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.inventory()); + sources.addAll(DmaapTopicSource.factory.inventory()); + sources.addAll(NoopTopicSource.factory.inventory()); + + return sources; + } + + @Override + public List<TopicSource> getTopicSources(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSource> sources = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSource uebSource = this.getUebTopicSource(topic); + if (uebSource != null) { + sources.add(uebSource); + } + } catch (final Exception e) { + logger.debug("No UEB source for topic: {}", topic, e); + } + + try { + final TopicSource dmaapSource = this.getDmaapTopicSource(topic); + if (dmaapSource != null) { + sources.add(dmaapSource); + } + } catch (final Exception e) { + logger.debug("No DMAAP source for topic: {}", topic, e); + } + + try { + final TopicSource noopSource = this.getNoopTopicSource(topic); + if (noopSource != null) { + sources.add(noopSource); + } + } catch (final Exception e) { + logger.debug("No NOOP source for topic: {}", topic, e); + } + } + return sources; + } + + @Override + public List<TopicSink> getTopicSinks() { + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.inventory()); + sinks.addAll(DmaapTopicSink.factory.inventory()); + sinks.addAll(NoopTopicSink.factory.inventory()); + + return sinks; + } + + @Override + public List<TopicSink> getTopicSinks(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSink> sinks = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSink uebSink = this.getUebTopicSink(topic); + if (uebSink != null) { + sinks.add(uebSink); + } + } catch (final Exception e) { + logger.debug("No UEB sink for topic: {}", topic, e); + } + + try { + final TopicSink dmaapSink = this.getDmaapTopicSink(topic); + if (dmaapSink != null) { + sinks.add(dmaapSink); + } + } catch (final Exception e) { + logger.debug("No DMAAP sink for topic: {}", topic, e); + } + + try { + final TopicSink noopSink = this.getNoopTopicSink(topic); + if (noopSink != null) { + sinks.add(noopSink); + } + } catch (final Exception e) { + logger.debug("No NOOP sink for topic: {}", topic, e); + } + } + return sinks; + } + + @Override + public List<TopicSink> getTopicSinks(String topicName) { + if (topicName == null) { + throw parmException(null); + } + + final List<TopicSink> sinks = new ArrayList<>(); + + try { + sinks.add(this.getUebTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getDmaapTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getNoopTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + return sinks; + } + + @JsonIgnore + @Override + public List<UebTopicSource> getUebTopicSources() { + return UebTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSource> getDmaapTopicSources() { + return DmaapTopicSource.factory.inventory(); + } + + @Override + public List<NoopTopicSource> getNoopTopicSources() { + return NoopTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<UebTopicSink> getUebTopicSinks() { + return UebTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSink> getDmaapTopicSinks() { + return DmaapTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<NoopTopicSink> getNoopTopicSinks() { + return NoopTopicSink.factory.inventory(); + } + + @Override + public boolean start() { + + synchronized (this) { + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (this.alive) { + return true; + } + + this.alive = true; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.start() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem starting endpoint: {}", endpoint, e); + } + } + + return success; + } + + @Override + public boolean stop() { + + /* + * stop regardless if it is locked, in other words, stop operation has precedence over + * locks. + */ + synchronized (this) { + this.alive = false; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.stop() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem stopping endpoint: {}", endpoint, e); + } + } + + return success; + } + + /** + * Gets the endpoints. + * + * @return list of managed endpoints + */ + @JsonIgnore + protected List<Startable> getEndpoints() { + final List<Startable> endpoints = new ArrayList<>(); + + endpoints.addAll(this.getTopicSources()); + endpoints.addAll(this.getTopicSinks()); + + return endpoints; + } + + @Override + public void shutdown() { + this.stop(); + + UebTopicSource.factory.destroy(); + UebTopicSink.factory.destroy(); + + DmaapTopicSource.factory.destroy(); + DmaapTopicSink.factory.destroy(); + + NoopTopicSink.factory.destroy(); + NoopTopicSource.factory.destroy(); + + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public boolean lock() { + + synchronized (this) { + if (this.locked) { + return true; + } + + this.locked = true; + } + + for (final TopicSource source : this.getTopicSources()) { + source.lock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.lock(); + } + + return true; + } + + @Override + public boolean unlock() { + synchronized (this) { + if (!this.locked) { + return true; + } + + this.locked = false; + } + + for (final TopicSource source : this.getTopicSources()) { + source.unlock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.unlock(); + } + + return true; + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @Override + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { + + if (commType == null) { + throw parmException(topicName); + } + + if (topicName == null) { + throw parmException(null); + } + + switch (commType) { + case UEB: + return this.getUebTopicSource(topicName); + case DMAAP: + return this.getDmaapTopicSource(topicName); + case NOOP: + return this.getNoopTopicSource(topicName); + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { + if (commType == null) { + throw parmException(topicName); + } + + if (topicName == null) { + throw parmException(null); + } + + switch (commType) { + case UEB: + return this.getUebTopicSink(topicName); + case DMAAP: + return this.getDmaapTopicSink(topicName); + case NOOP: + return this.getNoopTopicSink(topicName); + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public UebTopicSource getUebTopicSource(String topicName) { + return UebTopicSource.factory.get(topicName); + } + + @Override + public UebTopicSink getUebTopicSink(String topicName) { + return UebTopicSink.factory.get(topicName); + } + + @Override + public DmaapTopicSource getDmaapTopicSource(String topicName) { + return DmaapTopicSource.factory.get(topicName); + } + + @Override + public NoopTopicSource getNoopTopicSource(String topicName) { + return NoopTopicSource.factory.get(topicName); + } + + @Override + public DmaapTopicSink getDmaapTopicSink(String topicName) { + return DmaapTopicSink.factory.get(topicName); + } + + @Override + public NoopTopicSink getNoopTopicSink(String topicName) { + return NoopTopicSink.factory.get(topicName); + } + + private IllegalArgumentException parmException(String topicName) { + return new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + private void logNoSink(String topicName, Exception ex) { + logger.debug("No sink for topic: {}", topicName, ex); + } + +}
\ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index 7e666417..4c36a39a 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -398,7 +398,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { if (dmaapTopicWriters.containsKey(topic)) { return dmaapTopicWriters.get(topic); } else { - throw new IllegalArgumentException("DmaapTopicSink for " + topic + " not found"); + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java index f45164f8..ae6c6c3b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -448,7 +448,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { if (dmaapTopicSources.containsKey(topic)) { return dmaapTopicSources.get(topic); } else { - throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found"); + throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found"); } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java new file mode 100644 index 00000000..091e46bf --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java @@ -0,0 +1,144 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.bus.internal.TopicBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * No Operation topic endpoint. + */ +public abstract class NoopTopicEndpoint extends TopicBase { + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(NoopTopicEndpoint.class); + + /** + * Network logger. + */ + private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); + + /** + * {@inheritDoc}. + */ + public NoopTopicEndpoint(List<String> servers, String topic) { + super(servers, topic); + } + + /** + * I/O. + * + * @param message message. + * @return true if sucessful. + */ + protected boolean io(String message) { + + if (message == null || message.isEmpty()) { + throw new IllegalArgumentException("Message is empty"); + } + + if (!this.alive) { + throw new IllegalStateException(this + " is stopped"); + } + + try { + synchronized (this) { + this.recentEvents.add(message); + } + + netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), + message); + + broadcast(message); + } catch (Exception e) { + logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); + return false; + } + + return true; + } + + /** + * {@inheritDoc}. + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return CommInfrastructure.NOOP; + } + + /** + * {@inheritDoc}. + */ + @Override + public boolean start() { + logger.info("{}: starting", this); + + synchronized (this) { + + if (this.alive) { + return true; + } + + if (locked) { + throw new IllegalStateException(this + " is locked."); + } + + this.alive = true; + } + + return true; + } + + /** + * {@inheritDoc}. + */ + @Override + public boolean stop() { + logger.info("{}: stopping", this); + + synchronized (this) { + this.alive = false; + } + return true; + } + + /** + * {@inheritDoc}. + */ + @Override + public void shutdown() { + logger.info("{}: shutdown", this); + + this.stop(); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "NoopTopicEndpoint[" + super.toString() + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java new file mode 100644 index 00000000..98b6ed6f --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java @@ -0,0 +1,112 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +/** + * Noop Topic Factory. + */ +public abstract class NoopTopicFactory<T extends NoopTopicEndpoint> extends TopicBaseHashedFactory<T> { + + /** + * Get Topics Property Name. + * + * @return property name. + */ + protected abstract String getTopicsPropertyName(); + + /** + * {@inheritDoc}. + */ + @Override + protected List<String> getTopicNames(Properties properties) { + String topics = properties.getProperty(getTopicsPropertyName()); + if (topics == null || topics.isEmpty()) { + return new ArrayList<>(); + } + + return Arrays.asList(topics.split("\\s*,\\s*")); + } + + /** + * {@inheritDoc}. + */ + @Override + protected List<String> getServers(String topicName, Properties properties) { + String servers = + properties.getProperty(getTopicsPropertyName() + "." + topicName + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + servers = CommInfrastructure.NOOP.toString(); + } + + return new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + } + + /** + * {@inheritDoc}. + */ + @Override + protected boolean isManaged(String topicName, Properties properties) { + String managedString = + properties.getProperty(getTopicsPropertyName() + + "." + topicName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + return managed; + } + + /** + * {@inheritDoc}. + */ + @Override + public T build(List<String> serverList, String topic, boolean managed) { + List<String> servers; + if (serverList == null || serverList.isEmpty()) { + servers = Collections.singletonList(CommInfrastructure.NOOP.toString()); + } else { + servers = serverList; + } + + return super.build(servers, topic, managed); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "NoopTopicFactory[ " + super.toString() + " ]"; + } +} + diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java index 5a5f8fbc..f6ad433d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java @@ -1,8 +1,8 @@ -/*- +/* * ============LICENSE_START======================================================= - * policy-endpoints + * ONAP * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,112 +21,39 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.List; - import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.internal.TopicBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * NOOP topic sink. + * No Operation Topic Sink. */ -public class NoopTopicSink extends TopicBase implements TopicSink { +public class NoopTopicSink extends NoopTopicEndpoint implements TopicSink { /** - * factory. + * Factory. */ - public static final NoopTopicSinkFactory factory = new IndexedNoopTopicSinkFactory(); + public static final NoopTopicSinkFactory factory = new NoopTopicSinkFactory(); /** - * logger. - */ - private static Logger logger = LoggerFactory.getLogger(NoopTopicSink.class); - - /** - * net logger. - */ - private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); - - /** - * constructor. - * - * @param servers servers - * @param topic topic - * @throws IllegalArgumentException if an invalid argument has been passed in + * {@inheritDoc}. */ public NoopTopicSink(List<String> servers, String topic) { super(servers, topic); } + /** + * {@inheritDoc}. + */ @Override public boolean send(String message) { - - if (message == null || message.isEmpty()) { - throw new IllegalArgumentException("Message to send is empty"); - } - - if (!this.alive) { - throw new IllegalStateException(this + " is stopped"); - } - - try { - synchronized (this) { - this.recentEvents.add(message); - } - - netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), - message); - - broadcast(message); - } catch (Exception e) { - logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); - return false; - } - - return true; - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return CommInfrastructure.NOOP; - } - - @Override - public boolean start() { - logger.info("{}: starting", this); - - synchronized (this) { - - if (this.alive) { - return true; - } - - if (locked) { - throw new IllegalStateException(this + " is locked."); - } - - this.alive = true; - } - - return true; - } - - @Override - public boolean stop() { - synchronized (this) { - this.alive = false; - } - return true; - } - - @Override - public void shutdown() { - this.stop(); + return super.io(message); } + /** + * {@inheritDoc}. + */ @Override public String toString() { - return "NoopTopicSink [toString()=" + super.toString() + "]"; + return "NoopTopicSink[" + super.toString() + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java index b2c50184..0c38d196 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java @@ -1,8 +1,8 @@ /* * ============LICENSE_START======================================================= - * policy-endpoints + * ONAP * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,209 +20,37 @@ package org.onap.policy.common.endpoints.event.comm.bus; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Properties; - import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Noop Topic Sink Factory. */ -public interface NoopTopicSinkFactory { - - /** - * Creates noop topic sinks based on properties files. - * - * @param properties Properties containing initialization values - * - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - List<NoopTopicSink> build(Properties properties); - - /** - * builds a noop sink. - * - * @param servers list of servers - * @param topic topic name - * @param managed is this sink endpoint managed? - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - NoopTopicSink build(List<String> servers, String topic, boolean managed); - - /** - * Destroys a sink based on the topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all sinks. - */ - void destroy(); - - /** - * gets a sink based on topic name. - * - * @param topic the topic name - * - * @return a sink with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the sink is in an incorrect state - */ - NoopTopicSink get(String topic); +public class NoopTopicSinkFactory extends NoopTopicFactory<NoopTopicSink> { /** - * Provides a snapshot of the UEB Topic Writers. - * - * @return a list of the UEB Topic Writers + * {@inheritDoc}. */ - List<NoopTopicSink> inventory(); - -} - - -/* ------------- implementation ----------------- */ - -/** - * Factory of noop sinks. - */ -class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedNoopTopicSinkFactory.class); - - /** - * noop topic sinks map. - */ - protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>(); - @Override - public List<NoopTopicSink> build(Properties properties) { - - final String sinkTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS); - if (sinkTopics == null || sinkTopics.isEmpty()) { - logger.info("{}: no topic for noop sink", this); - return new ArrayList<>(); - } - - final List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); - final List<NoopTopicSink> newSinks = new ArrayList<>(); - synchronized (this) { - for (final String topic : sinkTopicList) { - if (this.noopTopicSinks.containsKey(topic)) { - newSinks.add(this.noopTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) { - servers = "noop"; - } - - final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - final NoopTopicSink noopSink = this.build(serverList, topic, managed); - newSinks.add(noopSink); - } - return newSinks; - } - } - - @Override - public NoopTopicSink build(List<String> servers, String topic, boolean managed) { - - List<String> noopSinkServers = servers; - if (noopSinkServers == null || noopSinkServers.isEmpty()) { - noopSinkServers = Arrays.asList("noop"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (this.noopTopicSinks.containsKey(topic)) { - return this.noopTopicSinks.get(topic); - } - - final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic); - - if (managed) { - this.noopTopicSinks.put(topic, sink); - } - - return sink; - } + protected String getTopicsPropertyName() { + return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS; } + /** + * {@inheritDoc}. + */ @Override - public void destroy(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - NoopTopicSink noopSink; - synchronized (this) { - if (!this.noopTopicSinks.containsKey(topic)) { - return; - } - - noopSink = this.noopTopicSinks.remove(topic); - } - - noopSink.shutdown(); + protected NoopTopicSink build(List<String> servers, String topic) { + return new NoopTopicSink(servers, topic); } + /** + * {@inheritDoc}. + */ @Override - public void destroy() { - final List<NoopTopicSink> sinks = this.inventory(); - for (final NoopTopicSink sink : sinks) { - sink.shutdown(); - } - - synchronized (this) { - this.noopTopicSinks.clear(); - } - } - - @Override - public NoopTopicSink get(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (this.noopTopicSinks.containsKey(topic)) { - return this.noopTopicSinks.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } + public String toString() { + return "NoopTopicSinkFactory [" + super.toString() + "]"; } - @Override - public List<NoopTopicSink> inventory() { - return new ArrayList<>(this.noopTopicSinks.values()); - } } + diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java new file mode 100644 index 00000000..c3215e04 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.TopicSource; + +/** + * No Operation Topic Source. + */ +public class NoopTopicSource extends NoopTopicEndpoint implements TopicSource { + + /** + * Factory. + */ + public static final NoopTopicSourceFactory factory = new NoopTopicSourceFactory(); + + /** + * {@inheritDoc}. + */ + public NoopTopicSource(List<String> servers, String topic) { + super(servers, topic); + } + + /** + * {@inheritDoc}. + */ + @Override + public boolean offer(String event) { + return super.io(event); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "NoopTopicSource[" + super.toString() + "]"; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java new file mode 100644 index 00000000..9623b4fa --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +/** + * No Operation Topic Source Factory. + */ +public class NoopTopicSourceFactory extends NoopTopicFactory<NoopTopicSource> { + + /** + * {@inheritDoc}. + */ + @Override + protected String getTopicsPropertyName() { + return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS; + } + + /** + * {@inheritDoc}. + */ + @Override + protected NoopTopicSource build(List<String> servers, String topic) { + return new NoopTopicSource(servers, topic); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "NoopTopicSourceFactory[" + super.toString() + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java new file mode 100644 index 00000000..897a8e19 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.Topic; + +/** + * Topic Base Factory. + * + * @param <T> Type. + */ +public interface TopicBaseFactory<T extends Topic> { + + /** + * build a TopicBase instance. + * + * @param properties properties. + * @return T instance. + */ + List<T> build(Properties properties); + + /** + * build a TopicBase instance. + * + * @param servers servers. + * @param topic topic. + * @param managed managed. + * @return T instance. + */ + T build(List<String> servers, String topic, boolean managed); + + /** + * destroy TopicBase instance. + * @param topic topic. + */ + void destroy(String topic); + + /** + * destroy. + */ + void destroy(); + + /** + * get T instance. + * + * @param topic topic. + * @return T instance. + */ + T get(String topic); + + /** + * inventory of T instances. + * + * @return T instance list. + */ + List<T> inventory(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java new file mode 100644 index 00000000..f958bd01 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java @@ -0,0 +1,197 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.Topic; + +/** + * Topic Factory implementation that indexes T instances in a hash table. + */ +public abstract class TopicBaseHashedFactory<T extends Topic> implements TopicBaseFactory<T> { + + protected static final String MISSING_TOPIC_MESSAGE = "A topic must be provided"; + protected static final String MISSING_SERVERS_MESSAGE = "Servers must be provided"; + + /** + * endpoints. + */ + protected final HashMap<String, T> endpoints = new HashMap<>(); + + /** + * get the topic names. + * + * @param properties properties. + * @return list of topic names. + */ + protected abstract List<String> getTopicNames(Properties properties); + + /** + * get the servers that this topic uses. + * + * @param topicName name. + * @param properties properties. + * @return list of servers. + */ + protected abstract List<String> getServers(String topicName, Properties properties); + + /** + * is this topic managed? + * + * @param topicName name. + * @param properties properties. + * @return if managed. + */ + protected abstract boolean isManaged(String topicName, Properties properties); + + /** + * construct an instance of an endpoint. + * + * @param servers servers, + * @param topic topic. + * @return an instance of T. + */ + protected abstract T build(List<String> servers, String topic); + + /** + * {@inheritDoc}. + */ + @Override + public List<T> build(Properties properties) { + List<String> topicNames = getTopicNames(properties); + if (topicNames == null || topicNames.isEmpty()) { + return Collections.emptyList(); + } + + List<T> newEndpoints = new ArrayList<>(); + synchronized (this) { + for (String name : topicNames) { + if (this.endpoints.containsKey(name)) { + newEndpoints.add(this.endpoints.get(name)); + continue; + } + + newEndpoints.add(this.build(getServers(name, properties), name, isManaged(name, properties))); + } + } + return newEndpoints; + } + + /** + * {@inheritDoc}. + */ + @Override + public T build(List<String> servers, String topic, boolean managed) { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException(MISSING_SERVERS_MESSAGE); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC_MESSAGE); + } + + synchronized (this) { + if (this.endpoints.containsKey(topic)) { + return this.endpoints.get(topic); + } + + T endpoint = build(servers, topic); + if (managed) { + this.endpoints.put(topic, endpoint); + } + + return endpoint; + } + } + + /** + * {@inheritDoc}. + */ + @Override + public void destroy(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC_MESSAGE); + } + + T endpoint; + synchronized (this) { + if (!this.endpoints.containsKey(topic)) { + return; + } + + endpoint = this.endpoints.remove(topic); + } + endpoint.shutdown(); + } + + /** + * {@inheritDoc}. + */ + @Override + public void destroy() { + final List<T> snapshotEndpoints = this.inventory(); + for (final T snapshot : snapshotEndpoints) { + snapshot.shutdown(); + } + + synchronized (this) { + this.endpoints.clear(); + } + } + + /** + * {@inheritDoc}. + */ + @Override + public T get(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC_MESSAGE); + } + + synchronized (this) { + if (this.endpoints.containsKey(topic)) { + return this.endpoints.get(topic); + } else { + throw new IllegalStateException(topic + " not found"); + } + } + } + + /** + * {@inheritDoc}. + */ + @Override + public List<T> inventory() { + return new ArrayList<>(this.endpoints.values()); + } + + /** + * {@inheritDoc}. + */ + @Override + public String toString() { + return "TopicBaseHashedFactory[ " + super.toString() + " ]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java index cc71748d..883ba7d2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java @@ -1,8 +1,8 @@ /*- * ============LICENSE_START======================================================= - * policy-core + * ONAP * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,6 +71,7 @@ public interface PolicyEndPointProperties { String PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX = ".dme2.sessionStickinessRequired"; + String PROPERTY_NOOP_SOURCE_TOPICS = "noop.source.topics"; String PROPERTY_NOOP_SINK_TOPICS = "noop.sink.topics"; /* HTTP Server Properties */ @@ -98,5 +99,4 @@ public interface PolicyEndPointProperties { String PROPERTY_HTTP_CLIENT_SERVICES = "http.client.services"; String PROPERTY_HTTP_URL_SUFFIX = PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX; - } |