aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java')
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java692
1 files changed, 692 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java
new file mode 100644
index 00000000..b3f236f7
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java
@@ -0,0 +1,692 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
+import org.openecomp.policy.drools.event.comm.bus.UebTopicSink;
+import org.openecomp.policy.drools.event.comm.bus.UebTopicSource;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.common.logging.eelf.MessageCodes;
+import org.openecomp.policy.drools.properties.Lockable;
+import org.openecomp.policy.drools.properties.Startable;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Abstraction to managed the system's Networked Topic Endpoints,
+ * sources of all events input into the System.
+ */
+public interface TopicEndpoint extends Startable, Lockable {
+
+ /**
+ * Add Topic Sources to the communication infrastructure initialized per
+ * properties
+ *
+ * @param properties properties for Topic Source construction
+ * @return a generic Topic Source
+ * @throws IllegalArgumentException when invalid arguments are provided
+ */
+ public List<? extends TopicSource> addTopicSources(Properties properties)
+ throws IllegalArgumentException;
+
+ /**
+ * Add Topic Sinks to the communication infrastructure initialized per
+ * properties
+ *
+ * @param properties properties for Topic Sink construction
+ * @return a generic Topic Sink
+ * @throws IllegalArgumentException when invalid arguments are provided
+ */
+ public List<? extends TopicSink> addTopicSinks(Properties properties)
+ throws IllegalArgumentException;
+
+ /**
+ * gets all Topic Sources
+ * @return the Topic Source List
+ */
+ List<? extends TopicSource> getTopicSources();
+
+ /**
+ * get the Topic Sources for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source List
+ * @throws IllegalStateException if the entity is in an invalid state
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public List<? extends TopicSource> getTopicSources(List<String> topicNames)
+ throws IllegalStateException, IllegalArgumentException;
+
+ /**
+ * gets the Topic Source for the given topic name and
+ * underlying communication infrastructure type
+ *
+ * @param commType communication infrastructure type
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for
+ * example multiple TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ * @throws UnsupportedOperationException if the operation is not supported.
+ */
+ public TopicSource getTopicSource(Topic.CommInfrastructure commType,
+ String topicName)
+ throws IllegalStateException, IllegalArgumentException,
+ UnsupportedOperationException;
+
+ /**
+ * get the UEB Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the UEB Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for
+ * example multiple TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public UebTopicSource getUebTopicSource(String topicName)
+ throws IllegalStateException, IllegalArgumentException;
+
+ /**
+ * get the DMAAP Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the DMAAP Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for
+ * example multiple TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSource getDmaapTopicSource(String topicName)
+ throws IllegalStateException, IllegalArgumentException;
+
+ /**
+ * get the Topic Sinks for the given topic name
+ *
+ * @param topicNames the topic names
+ * @return the Topic Sink List
+ * @throws IllegalStateException
+ * @throws IllegalArgumentException
+ */
+ public List<? extends TopicSink> getTopicSinks(List<String> topicNames)
+ throws IllegalStateException, IllegalArgumentException;
+
+ /**
+ * get the Topic Sinks for the given topic name and
+ * underlying communication infrastructure type
+ *
+ * @param topicName the topic name
+ * @param commType communication infrastructure type
+ *
+ * @return the Topic Sink List
+ * @throws IllegalStateException if the entity is in an invalid state, for
+ * example multiple TopicWriters for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public TopicSink getTopicSink(Topic.CommInfrastructure commType,
+ String topicName)
+ throws IllegalStateException, IllegalArgumentException,
+ UnsupportedOperationException;
+
+ /**
+ * get the Topic Sinks for the given topic name and
+ * all the underlying communication infrastructure type
+ *
+ * @param topicName the topic name
+ * @param commType communication infrastructure type
+ *
+ * @return the Topic Sink List
+ * @throws IllegalStateException if the entity is in an invalid state, for
+ * example multiple TopicWriters for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public List<? extends TopicSink> getTopicSinks(String topicName)
+ throws IllegalStateException, IllegalArgumentException;
+
+ /**
+ * get the UEB Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for
+ * example multiple TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public UebTopicSink getUebTopicSink(String topicName)
+ throws IllegalStateException, IllegalArgumentException;
+
+ /**
+ * get the DMAAP Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for
+ * example multiple TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSink getDmaapTopicSink(String topicName)
+ throws IllegalStateException, IllegalArgumentException;
+
+ /**
+ * gets only the UEB Topic Sources
+ * @return the UEB Topic Source List
+ */
+ public List<UebTopicSource> getUebTopicSources();
+
+ /**
+ * gets only the DMAAP Topic Sources
+ * @return the DMAAP Topic Source List
+ */
+ public List<DmaapTopicSource> getDmaapTopicSources();
+
+ /**
+ * gets all Topic Sinks
+ * @return the Topic Sink List
+ */
+ public List<? extends TopicSink> getTopicSinks();
+
+ /**
+ * gets only the UEB Topic Sinks
+ * @return the UEB Topic Sink List
+ */
+ public List<UebTopicSink> getUebTopicSinks();
+
+ /**
+ * gets only the DMAAP Topic Sinks
+ * @return the DMAAP Topic Sink List
+ */
+ public List<DmaapTopicSink> getDmaapTopicSinks();
+
+ /**
+ * singleton for global access
+ */
+ public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
+}
+
+/*
+ * ----------------- implementation -------------------
+ */
+
+/**
+ * This implementation of the Topic Endpoint Manager, proxies operations to appropriate
+ * implementations according to the communication infrastructure that are supported
+ */
+class ProxyTopicEndpointManager implements TopicEndpoint {
+ // get an instance of logger
+ private static Logger logger = FlexLogger.getLogger(ProxyTopicEndpointManager.class);
+ /**
+ * Is this element locked?
+ */
+ protected volatile boolean locked = false;
+
+ /**
+ * Is this element alive?
+ */
+ protected volatile boolean alive = false;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<? extends TopicSource> addTopicSources(Properties properties) throws IllegalArgumentException {
+
+ // 1. Create UEB Sources
+ // 2. Create DMAAP Sources
+
+ List<TopicSource> sources = new ArrayList<TopicSource>();
+
+ sources.addAll(UebTopicSource.factory.build(properties));
+ sources.addAll(DmaapTopicSource.factory.build(properties));
+
+ if (this.isLocked()) {
+ for (TopicSource source : sources) {
+ source.lock();
+ }
+ }
+
+ return sources;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<? extends TopicSink> addTopicSinks(Properties properties) throws IllegalArgumentException {
+ // 1. Create UEB Sinks
+ // 2. Create DMAAP Sinks
+
+ List<TopicSink> sinks = new ArrayList<TopicSink>();
+
+ sinks.addAll(UebTopicSink.factory.build(properties));
+ sinks.addAll(DmaapTopicSink.factory.build(properties));
+
+ if (this.isLocked()) {
+ for (TopicSink sink : sinks) {
+ sink.lock();
+ }
+ }
+
+ return sinks;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<? extends TopicSource> getTopicSources() {
+
+ List<TopicSource> sources = new ArrayList<TopicSource>();
+
+ sources.addAll(UebTopicSource.factory.inventory());
+ sources.addAll(DmaapTopicSource.factory.inventory());
+
+ return sources;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<? extends TopicSink> getTopicSinks() {
+
+ List<TopicSink> sinks = new ArrayList<TopicSink>();
+
+ sinks.addAll(UebTopicSink.factory.inventory());
+ sinks.addAll(DmaapTopicSink.factory.inventory());
+
+ return sinks;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @JsonIgnore
+ @Override
+ public List<UebTopicSource> getUebTopicSources() {
+ return UebTopicSource.factory.inventory();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @JsonIgnore
+ @Override
+ public List<DmaapTopicSource> getDmaapTopicSources() {
+ return DmaapTopicSource.factory.inventory();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @JsonIgnore
+ @Override
+ public List<UebTopicSink> getUebTopicSinks() {
+ return UebTopicSink.factory.inventory();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @JsonIgnore
+ @Override
+ public List<DmaapTopicSink> getDmaapTopicSinks() {
+ return DmaapTopicSink.factory.inventory();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean start() throws IllegalStateException {
+
+ synchronized (this) {
+ if (this.locked) {
+ throw new IllegalStateException(this + " is locked");
+ }
+
+ if (this.alive) {
+ return true;
+ }
+
+ this.alive = true;
+ }
+
+ List<Startable> endpoints = getEndpoints();
+
+ boolean success = true;
+ for (Startable endpoint: endpoints) {
+ try {
+ success = endpoint.start() && success;
+ } catch (Exception e) {
+ success = false;
+ logger.error(MessageCodes.EXCEPTION_ERROR, e, endpoint.toString(), this.toString());
+ }
+ }
+
+ return success;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean stop() throws IllegalStateException {
+
+ /*
+ * stop regardless if it is locked, in other
+ * words, stop operation has precedence over
+ * locks.
+ */
+ synchronized (this) {
+ this.alive = false;
+ }
+
+ List<Startable> endpoints = getEndpoints();
+
+ boolean success = true;
+ for (Startable endpoint: endpoints) {
+ try {
+ success = endpoint.stop() && success;
+ } catch (Exception e) {
+ success = false;
+ logger.error(MessageCodes.EXCEPTION_ERROR, e, endpoint.toString(), this.toString());
+ }
+ }
+
+ return success;
+ }
+
+ /**
+ *
+ * @return list of managed endpoints
+ */
+ @JsonIgnore
+ protected List<Startable> getEndpoints() {
+ List<Startable> endpoints = new ArrayList<Startable>();
+
+ endpoints.addAll(this.getTopicSources());
+ endpoints.addAll(this.getTopicSinks());
+
+ return endpoints;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void shutdown() throws IllegalStateException {
+ UebTopicSource.factory.destroy();
+ UebTopicSink.factory.destroy();
+
+ DmaapTopicSource.factory.destroy();
+ DmaapTopicSink.factory.destroy();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isAlive() {
+ return this.alive;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean lock() {
+
+ synchronized (this) {
+ if (locked)
+ return true;
+
+ this.locked = true;
+ }
+
+ for (TopicSource source: this.getTopicSources()) {
+ source.lock();
+ }
+
+ for (TopicSink sink: this.getTopicSinks()) {
+ sink.lock();
+ }
+
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean unlock() {
+ synchronized (this) {
+ if (!locked)
+ return true;
+
+ this.locked = false;
+ }
+
+ for (TopicSource source: this.getTopicSources()) {
+ source.unlock();
+ }
+
+ for (TopicSink sink: this.getTopicSinks()) {
+ sink.unlock();
+ }
+
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isLocked() {
+ return this.locked;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<? extends TopicSource> getTopicSources(List<String> topicNames)
+ throws IllegalStateException, IllegalArgumentException {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ List<TopicSource> sources = new ArrayList<TopicSource>();
+ for (String topic: topicNames) {
+ try {
+ TopicSource uebSource = this.getUebTopicSource(topic);
+ if (uebSource != null)
+ sources.add(uebSource);
+ } catch (Exception e) {
+ logger.info("No UEB source for topic: " + topic);
+ }
+
+ try {
+ TopicSource dmaapSource = this.getDmaapTopicSource(topic);
+ if (dmaapSource != null)
+ sources.add(dmaapSource);
+ } catch (Exception e) {
+ logger.info("No DMAAP source for topic: " + topic);
+ }
+ }
+ return sources;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<? extends TopicSink> getTopicSinks(List<String> topicNames)
+ throws IllegalStateException, IllegalArgumentException {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ List<TopicSink> sinks = new ArrayList<TopicSink>();
+ for (String topic: topicNames) {
+ try {
+ TopicSink uebSink = this.getUebTopicSink(topic);
+ if (uebSink != null)
+ sinks.add(uebSink);
+ } catch (Exception e) {
+ logger.info("No UEB sink for topic: " + topic);
+ }
+
+ try {
+ TopicSink dmaapSink = this.getDmaapTopicSink(topic);
+ if (dmaapSink != null)
+ sinks.add(dmaapSink);
+ } catch (Exception e) {
+ logger.info("No DMAAP sink for topic: " + topic);
+ }
+ }
+ return sinks;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
+ throws IllegalStateException, IllegalArgumentException, UnsupportedOperationException {
+
+ if (commType == null) {
+ throw new IllegalArgumentException
+ ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ if (topicName == null) {
+ throw new IllegalArgumentException
+ ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ switch (commType) {
+ case UEB:
+ return this.getUebTopicSource(topicName);
+ case DMAAP:
+ return this.getDmaapTopicSource(topicName);
+ case REST:
+ default:
+ throw new UnsupportedOperationException("Unsupported " + commType.name());
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
+ throws IllegalStateException, IllegalArgumentException, UnsupportedOperationException {
+ if (commType == null) {
+ throw new IllegalArgumentException
+ ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ if (topicName == null) {
+ throw new IllegalArgumentException
+ ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ switch (commType) {
+ case UEB:
+ return this.getUebTopicSink(topicName);
+ case DMAAP:
+ return this.getDmaapTopicSink(topicName);
+ case REST:
+ default:
+ throw new UnsupportedOperationException("Unsupported " + commType.name());
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<? extends TopicSink> getTopicSinks(String topicName)
+ throws IllegalStateException, IllegalArgumentException {
+
+ if (topicName == null) {
+ throw new IllegalArgumentException
+ ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ List<TopicSink> sinks = new ArrayList<TopicSink>();
+
+ try {
+ sinks.add(this.getUebTopicSink(topicName));
+ } catch (Exception e) {
+ ;
+ }
+
+ try {
+ sinks.add(this.getDmaapTopicSink(topicName));
+ } catch (Exception e) {
+ ;
+ }
+
+ return sinks;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public UebTopicSource getUebTopicSource(String topicName) throws IllegalStateException, IllegalArgumentException {
+ return UebTopicSource.factory.get(topicName);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public UebTopicSink getUebTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException {
+ return UebTopicSink.factory.get(topicName);
+ }
+
+ @Override
+ public DmaapTopicSource getDmaapTopicSource(String topicName)
+ throws IllegalStateException, IllegalArgumentException {
+ return DmaapTopicSource.factory.get(topicName);
+ }
+
+ @Override
+ public DmaapTopicSink getDmaapTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException {
+ return DmaapTopicSink.factory.get(topicName);
+ }
+
+}