diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java | 658 |
1 files changed, 0 insertions, 658 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java deleted file mode 100644 index 5c04bb8f..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java +++ /dev/null @@ -1,658 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.drools.event.comm.bus.DmaapTopicSink; -import org.onap.policy.drools.event.comm.bus.DmaapTopicSource; -import org.onap.policy.drools.event.comm.bus.NoopTopicSink; -import org.onap.policy.drools.event.comm.bus.UebTopicSink; -import org.onap.policy.drools.event.comm.bus.UebTopicSource; -import org.onap.policy.drools.properties.Lockable; -import org.onap.policy.drools.properties.Startable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into - * the System. - */ -public interface TopicEndpoint extends Startable, Lockable { - - /** - * singleton for global access - */ - public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); - - /** - * Add Topic Sources to the communication infrastructure initialized per properties - * - * @param properties properties for Topic Source construction - * @return a generic Topic Source - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List<TopicSource> addTopicSources(Properties properties); - - /** - * Add Topic Sinks to the communication infrastructure initialized per properties - * - * @param properties properties for Topic Sink construction - * @return a generic Topic Sink - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List<TopicSink> addTopicSinks(Properties properties); - - /** - * gets all Topic Sources - * - * @return the Topic Source List - */ - List<TopicSource> getTopicSources(); - - /** - * get the Topic Sources for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source List - * @throws IllegalStateException if the entity is in an invalid state - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<TopicSource> getTopicSources(List<String> topicNames); - - /** - * gets the Topic Source for the given topic name and underlying communication infrastructure type - * - * @param commType communication infrastructure type - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - * @throws UnsupportedOperationException if the operation is not supported. - */ - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the UEB Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSource getUebTopicSource(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the DMAAP Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource getDmaapTopicSource(String topicName); - - /** - * get the Topic Sinks for the given topic name - * - * @param topicNames the topic names - * @return the Topic Sink List - * @throws IllegalStateException - * @throws IllegalArgumentException - */ - public List<TopicSink> getTopicSinks(List<String> topicNames); - - /** - * get the Topic Sinks for the given topic name and underlying communication infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); - - /** - * get the Topic Sinks for the given topic name and all the underlying communication - * infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<TopicSink> getTopicSinks(String topicName); - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSink getUebTopicSink(String topicName); - - /** - * get the no-op Topic Sink for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public NoopTopicSink getNoopTopicSink(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink getDmaapTopicSink(String topicName); - - /** - * gets only the UEB Topic Sources - * - * @return the UEB Topic Source List - */ - public List<UebTopicSource> getUebTopicSources(); - - /** - * gets only the DMAAP Topic Sources - * - * @return the DMAAP Topic Source List - */ - public List<DmaapTopicSource> getDmaapTopicSources(); - - /** - * gets all Topic Sinks - * - * @return the Topic Sink List - */ - public List<TopicSink> getTopicSinks(); - - /** - * gets only the UEB Topic Sinks - * - * @return the UEB Topic Sink List - */ - public List<UebTopicSink> getUebTopicSinks(); - - /** - * gets only the DMAAP Topic Sinks - * - * @return the DMAAP Topic Sink List - */ - public List<DmaapTopicSink> getDmaapTopicSinks(); - - /** - * gets only the NOOP Topic Sinks - * - * @return the NOOP Topic Sinks List - */ - public List<NoopTopicSink> getNoopTopicSinks(); -} - - -/* - * ----------------- implementation ------------------- - */ - -/** - * This implementation of the Topic Endpoint Manager, proxies operations to appropriate - * implementations according to the communication infrastructure that are supported - */ -class ProxyTopicEndpointManager implements TopicEndpoint { - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); - /** - * Is this element locked? - */ - protected volatile boolean locked = false; - - /** - * Is this element alive? - */ - protected volatile boolean alive = false; - - @Override - public List<TopicSource> addTopicSources(Properties properties) { - - // 1. Create UEB Sources - // 2. Create DMAAP Sources - - final List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.build(properties)); - sources.addAll(DmaapTopicSource.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSource source : sources) { - source.lock(); - } - } - - return sources; - } - - @Override - public List<TopicSink> addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - - final List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.build(properties)); - sinks.addAll(DmaapTopicSink.factory.build(properties)); - sinks.addAll(NoopTopicSink.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSink sink : sinks) { - sink.lock(); - } - } - - return sinks; - } - - @Override - public List<TopicSource> getTopicSources() { - - final List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.inventory()); - sources.addAll(DmaapTopicSource.factory.inventory()); - - return sources; - } - - @Override - public List<TopicSink> getTopicSinks() { - - final List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.inventory()); - sinks.addAll(DmaapTopicSink.factory.inventory()); - sinks.addAll(NoopTopicSink.factory.inventory()); - - return sinks; - } - - @JsonIgnore - @Override - public List<UebTopicSource> getUebTopicSources() { - return UebTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSource> getDmaapTopicSources() { - return DmaapTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<UebTopicSink> getUebTopicSinks() { - return UebTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSink> getDmaapTopicSinks() { - return DmaapTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<NoopTopicSink> getNoopTopicSinks() { - return NoopTopicSink.factory.inventory(); - } - - @Override - public boolean start() { - - synchronized (this) { - if (this.locked) { - throw new IllegalStateException(this + " is locked"); - } - - if (this.alive) { - return true; - } - - this.alive = true; - } - - final List<Startable> endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.start() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem starting endpoint: {}", endpoint, e); - } - } - - return success; - } - - - @Override - public boolean stop() { - - /* - * stop regardless if it is locked, in other words, stop operation has precedence over locks. - */ - synchronized (this) { - this.alive = false; - } - - final List<Startable> endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.stop() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem stopping endpoint: {}", endpoint, e); - } - } - - return success; - } - - /** - * - * @return list of managed endpoints - */ - @JsonIgnore - protected List<Startable> getEndpoints() { - final List<Startable> endpoints = new ArrayList<>(); - - endpoints.addAll(this.getTopicSources()); - endpoints.addAll(this.getTopicSinks()); - - return endpoints; - } - - @Override - public void shutdown() { - UebTopicSource.factory.destroy(); - UebTopicSink.factory.destroy(); - NoopTopicSink.factory.destroy(); - - DmaapTopicSource.factory.destroy(); - DmaapTopicSink.factory.destroy(); - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean lock() { - - synchronized (this) { - if (this.locked) - return true; - - this.locked = true; - } - - for (final TopicSource source : this.getTopicSources()) { - source.lock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.lock(); - } - - return true; - } - - @Override - public boolean unlock() { - synchronized (this) { - if (!this.locked) - return true; - - this.locked = false; - } - - for (final TopicSource source : this.getTopicSources()) { - source.unlock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.unlock(); - } - - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public List<TopicSource> getTopicSources(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List<TopicSource> sources = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) - sources.add(uebSource); - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } - - try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) - sources.add(dmaapSource); - } catch (final Exception e) { - logger.debug("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List<TopicSink> getTopicSinks(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List<TopicSink> sinks = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) - sinks.add(uebSink); - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - - try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) - sinks.add(dmaapSink); - } catch (final Exception e) { - logger.debug("No DMAAP sink for topic: {}", topic, e); - } - - try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) - sinks.add(noopSink); - } catch (final Exception e) { - logger.debug("No NOOP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { - - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - private IllegalArgumentException parmException(String topicName) { - return new IllegalArgumentException( - "Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - @Override - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case NOOP: - return this.getNoopTopicSink(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public List<TopicSink> getTopicSinks(String topicName) { - if (topicName == null) { - throw parmException(topicName); - } - - final List<TopicSink> sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getNoopTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - return sinks; - } - -private void logNoSink(String topicName, Exception ex) { - logger.debug("No sink for topic: {}", topicName, ex); -} - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicSource.factory.get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicSink.factory.get(topicName); - } - - @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicSource.factory.get(topicName); - } - - @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicSink.factory.get(topicName); - } - - @Override - public NoopTopicSink getNoopTopicSink(String topicName) { - return NoopTopicSink.factory.get(topicName); - } - -} |