summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main')
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java12
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java36
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java8
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java7
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/ApiKeyEnabled.java (renamed from policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java)12
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java2
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java2
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSink.java126
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSinkFactory.java226
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java2
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java2
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java87
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java93
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java2
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java4
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java134
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/TopicBase.java229
17 files changed, 691 insertions, 293 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java
index 54f49fdc..62640bd8 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java
@@ -22,11 +22,17 @@ package org.openecomp.policy.drools.event.comm;
import java.util.List;
+import org.openecomp.policy.drools.properties.Lockable;
+import org.openecomp.policy.drools.properties.Startable;
+
/**
* Essential Topic Data
*/
-public interface Topic {
+public interface Topic extends TopicRegisterable, Startable, Lockable {
+ /**
+ * network logger
+ */
public static final String NETWORK_LOGGER = "network";
/**
@@ -42,6 +48,10 @@ public interface Topic {
*/
DMAAP,
/**
+ * NOOP for internal use only
+ */
+ NOOP,
+ /**
* REST Communication Infrastructure
*/
REST
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java
index fa73ecb7..337d78e5 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
+import org.openecomp.policy.drools.event.comm.bus.NoopTopicSink;
import org.openecomp.policy.drools.event.comm.bus.UebTopicSink;
import org.openecomp.policy.drools.event.comm.bus.UebTopicSource;
import org.slf4j.LoggerFactory;
@@ -181,6 +182,19 @@ public interface TopicEndpoint extends Startable, Lockable {
throws IllegalStateException, IllegalArgumentException;
/**
+ * get the no-op Topic Sink for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for
+ * example multiple TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public NoopTopicSink getNoopTopicSink(String topicName)
+ throws IllegalStateException, IllegalArgumentException;
+
+ /**
* get the DMAAP Topic Source for the given topic name
*
* @param topicName the topic name
@@ -224,6 +238,12 @@ public interface TopicEndpoint extends Startable, Lockable {
public List<DmaapTopicSink> getDmaapTopicSinks();
/**
+ * gets only the NOOP Topic Sinks
+ * @return the NOOP Topic Sinks List
+ */
+ public List<NoopTopicSink> getNoopTopicSinks();
+
+ /**
* singleton for global access
*/
public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
@@ -287,6 +307,7 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
sinks.addAll(UebTopicSink.factory.build(properties));
sinks.addAll(DmaapTopicSink.factory.build(properties));
+ sinks.addAll(NoopTopicSink.factory.build(properties));
if (this.isLocked()) {
for (TopicSink sink : sinks) {
@@ -321,6 +342,7 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
sinks.addAll(UebTopicSink.factory.inventory());
sinks.addAll(DmaapTopicSink.factory.inventory());
+ sinks.addAll(NoopTopicSink.factory.inventory());
return sinks;
}
@@ -360,6 +382,15 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
public List<DmaapTopicSink> getDmaapTopicSinks() {
return DmaapTopicSink.factory.inventory();
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @JsonIgnore
+ @Override
+ public List<NoopTopicSink> getNoopTopicSinks() {
+ return NoopTopicSink.factory.inventory();
+ }
/**
* {@inheritDoc}
@@ -689,5 +720,10 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
public DmaapTopicSink getDmaapTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException {
return DmaapTopicSink.factory.get(topicName);
}
+
+ @Override
+ public NoopTopicSink getNoopTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException {
+ return NoopTopicSink.factory.get(topicName);
+ }
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java
index 2250b1ea..c2c47798 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java
@@ -20,22 +20,20 @@
package org.openecomp.policy.drools.event.comm;
-import org.openecomp.policy.drools.properties.Lockable;
-import org.openecomp.policy.drools.properties.Startable;
-
/**
* Marks a given Topic Endpoint as able to send messages over a topic
*/
-public interface TopicSink extends Topic, Startable, Lockable {
+public interface TopicSink extends Topic {
/**
* Sends a string message over this Topic Endpoint
*
* @param message message to send
+ *
* @return true if the send operation succeeded, false otherwise
* @throws IllegalArgumentException an invalid message has been provided
* @throws IllegalStateException the entity is in an state that prevents
- * it from sending messages, for example, locked or stopped.
+ * it from sending messages, for example, locked or stopped.
*/
public boolean send(String message) throws IllegalArgumentException, IllegalStateException;
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java
index 0dfbe1c4..6d1cbda4 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java
@@ -20,14 +20,11 @@
package org.openecomp.policy.drools.event.comm;
-import org.openecomp.policy.drools.properties.Lockable;
-import org.openecomp.policy.drools.properties.Startable;
-
/**
* Marker for a Topic Entity, indicating that the entity is able to read
* over a topic
*/
-public interface TopicSource extends TopicRegisterable, Topic, Startable, Lockable {
+public interface TopicSource extends Topic {
/**
* pushes an event into the source programatically
@@ -37,4 +34,4 @@ public interface TopicSource extends TopicRegisterable, Topic, Startable, Lockab
*/
public boolean offer(String event);
-}
+} \ No newline at end of file
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/ApiKeyEnabled.java
index c38f627e..3afd0add 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/ApiKeyEnabled.java
@@ -20,7 +20,17 @@
package org.openecomp.policy.drools.event.comm.bus;
-public interface BusTopic {
+/**
+ * API
+ */
+public interface ApiKeyEnabled {
+ /**
+ * @return api key
+ */
public String getApiKey();
+
+ /**
+ * @return api secret
+ */
public String getApiSecret();
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java
index 30978c27..59525334 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java
@@ -25,7 +25,7 @@ import org.openecomp.policy.drools.event.comm.TopicSink;
/**
* Topic Sink over Bus Infrastructure (DMAAP/UEB)
*/
-public interface BusTopicSink extends BusTopic, TopicSink {
+public interface BusTopicSink extends ApiKeyEnabled, TopicSink {
/**
* Log Failures after X number of retries
*/
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java
index e6a46d2f..6796fc0a 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java
@@ -26,7 +26,7 @@ import org.openecomp.policy.drools.event.comm.TopicSource;
* Generic Topic Source for UEB/DMAAP Communication Infrastructure
*
*/
-public interface BusTopicSource extends BusTopic, TopicSource {
+public interface BusTopicSource extends ApiKeyEnabled, TopicSource {
/**
* Default Consumer Instance Value
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSink.java
new file mode 100644
index 00000000..e9d503e2
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSink.java
@@ -0,0 +1,126 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.event.comm.bus;
+
+import java.util.List;
+
+import org.openecomp.policy.drools.event.comm.TopicSink;
+import org.openecomp.policy.drools.event.comm.bus.internal.TopicBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NOOP topic sink
+ */
+public class NoopTopicSink extends TopicBase implements TopicSink {
+
+ /**
+ * factory
+ */
+ public static final NoopTopicSinkFactory factory = new IndexedNoopTopicSinkFactory();
+
+ /**
+ * logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(NoopTopicSink.class);
+
+ /**
+ * net logger
+ */
+ private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
+
+ /**
+ * constructor
+ * @param servers servers
+ * @param topic topic
+ * @throws IllegalArgumentException if an invalid argument has been passed in
+ */
+ public NoopTopicSink(List<String> servers, String topic) throws IllegalArgumentException {
+ super(servers, topic);
+ }
+
+ @Override
+ public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
+
+ if (message == null || message.isEmpty())
+ throw new IllegalArgumentException("Message to send is empty");
+
+ if (!this.alive)
+ throw new IllegalStateException(this + " is stopped");
+
+ try {
+ synchronized (this) {
+ this.recentEvents.add(message);
+ }
+
+ netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(),
+ this.topic, System.lineSeparator(), message);
+
+ broadcast(message);
+ } catch (Exception e) {
+ logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public CommInfrastructure getTopicCommInfrastructure() {
+ return CommInfrastructure.NOOP;
+ }
+
+ @Override
+ public boolean start() throws IllegalStateException {
+ logger.info("{}: starting", this);
+
+ synchronized(this) {
+
+ if (this.alive)
+ return true;
+
+ if (locked)
+ throw new IllegalStateException(this + " is locked.");
+
+ this.alive = true;
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean stop() throws IllegalStateException {
+ synchronized(this) {
+ this.alive = false;
+ }
+ return true;
+ }
+
+ @Override
+ public void shutdown() throws IllegalStateException {
+ this.stop();
+ }
+
+ @Override
+ public String toString() {
+ return "NoopTopicSink [toString()=" + super.toString() + "]";
+ }
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
new file mode 100644
index 00000000..788ea683
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
@@ -0,0 +1,226 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.drools.properties.PolicyProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Noop Topic Sink Factory
+ */
+public interface NoopTopicSinkFactory {
+
+ /**
+ * Creates noop topic sinks based on properties files
+ *
+ * @param properties Properties containing initialization values
+ *
+ * @return a noop topic sink
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public List<NoopTopicSink> build(Properties properties)
+ throws IllegalArgumentException;
+
+ /**
+ * builds a noop sink
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param managed is this sink endpoint managed?
+ * @return a noop topic sink
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public NoopTopicSink build(List<String> servers, String topic, boolean managed)
+ throws IllegalArgumentException;
+
+ /**
+ * Destroys a sink based on the topic
+ *
+ * @param topic topic name
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public void destroy(String topic);
+
+ /**
+ * gets a sink based on topic name
+ * @param topic the topic name
+ *
+ * @return a sink with topic name
+ * @throws IllegalArgumentException if an invalid topic is provided
+ * @throws IllegalStateException if the sink is in an incorrect state
+ */
+ public NoopTopicSink get(String topic)
+ throws IllegalArgumentException, IllegalStateException;
+
+ /**
+ * Provides a snapshot of the UEB Topic Writers
+ * @return a list of the UEB Topic Writers
+ */
+ public List<NoopTopicSink> inventory();
+
+ /**
+ * Destroys all sinks
+ */
+ public void destroy();
+}
+
+/* ------------- implementation ----------------- */
+
+/**
+ * Factory of noop sinks
+ */
+class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
+ /**
+ * Logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
+
+ /**
+ * noop topic sinks map
+ */
+ protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<String, NoopTopicSink>();
+
+ @Override
+ public List<NoopTopicSink> build(Properties properties) throws IllegalArgumentException {
+
+ String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS);
+ if (sinkTopics == null || sinkTopics.isEmpty()) {
+ logger.info("{}: no topic for noop sink", this);
+ return new ArrayList<NoopTopicSink>();
+ }
+
+ List<String> sinkTopicList = new ArrayList<String>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
+ List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>();
+ synchronized(this) {
+ for (String topic: sinkTopicList) {
+ if (this.noopTopicSinks.containsKey(topic)) {
+ newSinks.add(this.noopTopicSinks.get(topic));
+ continue;
+ }
+
+ String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." +
+ topic +
+ PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+
+ if (servers == null || servers.isEmpty())
+ servers = "noop";
+
+ List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+
+ String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+ boolean managed = true;
+ if (managedString != null && !managedString.isEmpty()) {
+ managed = Boolean.parseBoolean(managedString);
+ }
+
+ NoopTopicSink noopSink = this.build(serverList, topic, managed);
+ newSinks.add(noopSink);
+ }
+ return newSinks;
+ }
+ }
+
+ @Override
+ public NoopTopicSink build(List<String> servers, String topic, boolean managed) throws IllegalArgumentException {
+ if (servers == null) {
+ servers = new ArrayList<>();
+ }
+
+ if (servers.isEmpty()) {
+ servers.add("noop");
+ }
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized (this) {
+ if (noopTopicSinks.containsKey(topic)) {
+ return noopTopicSinks.get(topic);
+ }
+
+ NoopTopicSink sink =
+ new NoopTopicSink(servers, topic);
+
+ if (managed)
+ noopTopicSinks.put(topic, sink);
+
+ return sink;
+ }
+ }
+
+ @Override
+ public void destroy(String topic) {
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ NoopTopicSink noopSink;
+ synchronized(this) {
+ if (!noopTopicSinks.containsKey(topic)) {
+ return;
+ }
+
+ noopSink = noopTopicSinks.remove(topic);
+ }
+
+ noopSink.shutdown();
+ }
+
+ @Override
+ public void destroy() {
+ List<NoopTopicSink> sinks = this.inventory();
+ for (NoopTopicSink sink: sinks) {
+ sink.shutdown();
+ }
+
+ synchronized(this) {
+ this.noopTopicSinks.clear();
+ }
+ }
+
+ @Override
+ public NoopTopicSink get(String topic) throws IllegalArgumentException, IllegalStateException {
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized(this) {
+ if (noopTopicSinks.containsKey(topic)) {
+ return noopTopicSinks.get(topic);
+ } else {
+ throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
+ }
+ }
+ }
+
+ @Override
+ public List<NoopTopicSink> inventory() {
+ return new ArrayList<>(this.noopTopicSinks.values());
+ }
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java
index efa4dc5e..3966d1ff 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java
@@ -26,7 +26,7 @@ package org.openecomp.policy.drools.event.comm.bus;
public interface UebTopicSink extends BusTopicSink {
/**
- * Factory of UebTopicWriter for instantiation and management purposes
+ * Factory of UEB Topic Sinks for instantiation and management purposes
*/
public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory();
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
index 0469c4a3..58d5dff5 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
@@ -46,7 +46,7 @@ public interface UebTopicSinkFactory {
* @param partitionKey Consumer Group
* @param managed is this sink endpoint managed?
*
- * @return an UEB Topic Writer
+ * @return an UEB Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
public UebTopicSink build(List<String> servers,
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
index 4ac1c6fc..4beaa1f1 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
@@ -22,22 +22,32 @@ package org.openecomp.policy.drools.event.comm.bus.internal;
import java.util.List;
-import org.apache.commons.collections4.queue.CircularFifoQueue;
-import org.openecomp.policy.drools.event.comm.Topic;
-import org.openecomp.policy.drools.event.comm.bus.BusTopic;
+import org.openecomp.policy.drools.event.comm.bus.ApiKeyEnabled;
-public abstract class BusTopicBase implements BusTopic, Topic {
-
- protected List<String> servers;
-
- protected String topic;
+/**
+ * Bus Topic Base
+ */
+public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
+ /**
+ * API Key
+ */
protected String apiKey;
+
+ /**
+ * API Secret
+ */
protected String apiSecret;
+
+ /**
+ * Use https
+ */
protected boolean useHttps;
- protected boolean allowSelfSignedCerts;
- protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10);
+ /**
+ * allow self signed certificates
+ */
+ protected boolean allowSelfSignedCerts;
/**
* Instantiates a new Bus Topic Base
@@ -60,16 +70,7 @@ public abstract class BusTopicBase implements BusTopic, Topic {
boolean allowSelfSignedCerts)
throws IllegalArgumentException {
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("UEB Server(s) must be provided");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("An UEB Topic must be provided");
- }
-
- this.servers = servers;
- this.topic = topic;
+ super(servers, topic);
this.apiKey = apiKey;
this.apiSecret = apiSecret;
@@ -77,73 +78,35 @@ public abstract class BusTopicBase implements BusTopic, Topic {
this.allowSelfSignedCerts = allowSelfSignedCerts;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public String getTopic() {
- return topic;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<String> getServers() {
- return servers;
- }
-
- /**
- * {@inheritDoc}
- */
@Override
public String getApiKey() {
return apiKey;
}
- /**
- * {@inheritDoc}
- */
@Override
public String getApiSecret() {
return apiSecret;
}
/**
- * @return useHttps
+ * @return if using https
*/
public boolean isUseHttps(){
return useHttps;
}
/**
- * @return allowSelfSignedCerts
+ * @return if self signed certificates are allowed
*/
public boolean isAllowSelfSignedCerts(){
return allowSelfSignedCerts;
}
-
- /**
- * @return the recentEvents
- */
- @Override
- public synchronized String[] getRecentEvents() {
- String[] events = new String[recentEvents.size()];
- return recentEvents.toArray(events);
- }
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("UebTopicBase [servers=").append(servers)
- .append(", topic=").append(topic)
- .append(", apiKey=").append(apiKey)
- .append(", apiSecret=").append(apiSecret)
- .append(", useHttps=").append(useHttps)
- .append(", allowSelfSignedCerts=").append(allowSelfSignedCerts)
- .append("]");
- return builder.toString();
+ return "BusTopicBase [apiKey=" + apiKey + ", apiSecret=" + apiSecret + ", useHttps=" + useHttps
+ + ", allowSelfSignedCerts=" + allowSelfSignedCerts + ", toString()=" + super.toString() + "]";
}
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
index 64037749..3f1f3610 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
@@ -46,23 +46,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
protected String partitionId;
/**
- * Am I running?
- * reflects invocation of start()/stop()
- * !locked & start() => alive
- * stop() => !alive
- */
- protected volatile boolean alive = false;
-
- /**
- * Am I locked?
- * reflects invocation of lock()/unlock() operations
- * locked => !alive (but not in the other direction necessarily)
- * locked => !offer, !run, !start, !stop (but this last one is obvious
- * since locked => !alive)
- */
- protected volatile boolean locked = false;
-
- /**
* message bus publisher
*/
protected BusPublisher publisher;
@@ -99,8 +82,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* {@inheritDoc}
*/
@Override
- public boolean start() throws IllegalStateException {
-
+ public boolean start() throws IllegalStateException {
logger.info("{}: starting", this);
synchronized(this) {
@@ -144,64 +126,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
}
return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean lock() {
-
- logger.info("{}: locking", this);
-
- synchronized (this) {
- if (this.locked)
- return true;
-
- this.locked = true;
- }
-
- return this.stop();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean unlock() {
-
- logger.info("{}: unlocking", this);
-
- synchronized(this) {
- if (!this.locked)
- return true;
-
- this.locked = false;
- }
-
- try {
- return this.start();
- } catch (Exception e) {
- logger.warn("{}: cannot start after unlocking because of {}",
- this, e.getMessage(), e);
- return false;
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isLocked() {
- return this.locked;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isAlive() {
- return this.alive;
}
/**
@@ -226,7 +150,8 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(),
this.topic, System.lineSeparator(), message);
- publisher.send(this.partitionId, message);
+ publisher.send(this.partitionId, message);
+ broadcast(message);
} catch (Exception e) {
logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
return false;
@@ -260,10 +185,12 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
this.stop();
}
- /**
- * {@inheritDoc}
- */
- @Override
- public abstract CommInfrastructure getTopicCommInfrastructure();
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("InlineBusTopicSink [partitionId=").append(partitionId).append(", alive=").append(alive)
+ .append(", publisher=").append(publisher).append("]");
+ return builder.toString();
+ }
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
index a78dd0fc..37051e2f 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
@@ -91,8 +91,6 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
this.longitude = longitude;
this.additionalProps = additionalProps;
-
- this.init();
}
public InlineDmaapTopicSink(List<String> servers, String topic,
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
index 3b091f5a..d657f06e 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.openecomp.policy.drools.event.comm.Topic;
import org.openecomp.policy.drools.event.comm.bus.UebTopicSink;
import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
/**
* This implementation publishes events for the associated UEB topic,
@@ -35,8 +36,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
/**
* logger
*/
- private static org.slf4j.Logger logger =
- LoggerFactory.getLogger(InlineUebTopicSink.class);
+ private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class);
/**
* Argument-based UEB Topic Writer instantiation
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 85da3f01..0fa90b50 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -79,15 +79,6 @@ public abstract class SingleThreadedBusTopicSource
protected volatile boolean alive = false;
/**
- * Am I locked?
- * reflects invocation of lock()/unlock() operations
- * locked => !alive (but not in the other direction necessarily)
- * locked => !offer, !run, !start, !stop (but this last one is obvious
- * since locked => !alive)
- */
- protected volatile boolean locked = false;
-
- /**
* Independent thread reading message over my topic
*/
protected Thread busPollerThread;
@@ -157,52 +148,28 @@ public abstract class SingleThreadedBusTopicSource
*/
public abstract void init() throws Exception;
- /**
- * {@inheritDoc}
- */
@Override
public void register(TopicListener topicListener)
throws IllegalArgumentException {
- logger.info("{}: registering {}", this, topicListener);
-
- synchronized(this) {
- if (topicListener == null)
- throw new IllegalArgumentException("TopicListener must be provided");
-
- /* check that this listener is not registered already */
- for (TopicListener listener: this.topicListeners) {
- if (listener == topicListener) {
- // already registered
- return;
- }
- }
-
- this.topicListeners.add(topicListener);
- }
+ super.register(topicListener);
try {
- this.start();
+ if (!alive && !locked)
+ this.start();
+ else
+ logger.info("{}: register: start not attempted", this);
} catch (Exception e) {
logger.warn("{}: cannot start after registration of because of: {}",
this, topicListener, e.getMessage(), e);
}
}
- /**
- * {@inheritDoc}
- */
@Override
public void unregister(TopicListener topicListener) {
-
- logger.info("{}: unregistering {}", this, topicListener);
-
boolean stop = false;
synchronized (this) {
- if (topicListener == null)
- throw new IllegalArgumentException("TopicListener must be provided");
-
- this.topicListeners.remove(topicListener);
+ super.unregister(topicListener);
stop = (this.topicListeners.isEmpty());
}
@@ -211,49 +178,6 @@ public abstract class SingleThreadedBusTopicSource
}
}
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean lock() {
-
- logger.info("{}: locking", this);
-
- synchronized (this) {
- if (this.locked)
- return true;
-
- this.locked = true;
- }
-
- return this.stop();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean unlock() {
- logger.info("{}: unlocking", this);
-
- synchronized(this) {
- if (!this.locked)
- return true;
-
- this.locked = false;
- }
-
- try {
- return this.start();
- } catch (Exception e) {
- logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
- return false;
- }
- }
-
- /**
- * {@inheritDoc}
- */
@Override
public boolean start() throws IllegalStateException {
logger.info("{}: starting", this);
@@ -286,9 +210,6 @@ public abstract class SingleThreadedBusTopicSource
return this.alive;
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean stop() {
logger.info("{}: stopping", this);
@@ -312,49 +233,6 @@ public abstract class SingleThreadedBusTopicSource
return true;
}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isLocked() {
- return this.locked;
- }
-
- /**
- * broadcast event to all listeners
- *
- * @param message the event
- * @return true if all notifications are performed with no error, false otherwise
- */
- protected boolean broadcast(String message) {
-
- /* take a snapshot of listeners */
- List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
-
- boolean success = true;
- for (TopicListener topicListener: snapshotListeners) {
- try {
- topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
- } catch (Exception e) {
- logger.warn("{}: notification error @ {} because of {}",
- this, topicListener, e.getMessage(), e);
- success = false;
- }
- }
- return success;
- }
-
- /**
- * take a snapshot of current topic listeners
- *
- * @return the topic listeners
- */
- protected synchronized List<TopicListener> snapshotTopicListeners() {
- @SuppressWarnings("unchecked")
- List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
- return listeners;
- }
/**
* Run thread method for the Bus Reader
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/TopicBase.java
new file mode 100644
index 00000000..ff3d4667
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/TopicBase.java
@@ -0,0 +1,229 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.openecomp.policy.drools.event.comm.Topic;
+import org.openecomp.policy.drools.event.comm.TopicListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class TopicBase implements Topic {
+
+ /**
+ * logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
+
+ /**
+ * list of servers
+ */
+ protected List<String> servers;
+
+ /**
+ * Topic
+ */
+ protected String topic;
+
+ /**
+ * event cache
+ */
+ protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10);
+
+ /**
+ * Am I running?
+ * reflects invocation of start()/stop()
+ * !locked & start() => alive
+ * stop() => !alive
+ */
+ protected volatile boolean alive = false;
+
+ /**
+ * Am I locked?
+ * reflects invocation of lock()/unlock() operations
+ * locked => !alive (but not in the other direction necessarily)
+ * locked => !offer, !run, !start, !stop (but this last one is obvious
+ * since locked => !alive)
+ */
+ protected volatile boolean locked = false;
+
+ /**
+ * All my subscribers for new message notifications
+ */
+ protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
+
+ /**
+ * Instantiates a new Topic Base
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ *
+ * @return a Topic Base
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public TopicBase(List<String> servers, String topic) throws IllegalArgumentException {
+
+ if (servers == null || servers.isEmpty()) {
+ throw new IllegalArgumentException("Server(s) must be provided");
+ }
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A Topic must be provided");
+ }
+
+ this.servers = servers;
+ this.topic = topic;
+ }
+
+ @Override
+ public void register(TopicListener topicListener)
+ throws IllegalArgumentException {
+
+ logger.info("{}: registering {}", this, topicListener);
+
+ synchronized(this) {
+ if (topicListener == null)
+ throw new IllegalArgumentException("TopicListener must be provided");
+
+ for (TopicListener listener: this.topicListeners) {
+ if (listener == topicListener) return;
+ }
+
+ this.topicListeners.add(topicListener);
+ }
+ }
+
+ @Override
+ public void unregister(TopicListener topicListener) {
+
+ logger.info("{}: unregistering {}", this, topicListener);
+
+ synchronized (this) {
+ if (topicListener == null)
+ throw new IllegalArgumentException("TopicListener must be provided");
+
+ this.topicListeners.remove(topicListener);
+ }
+ }
+
+ /**
+ * broadcast event to all listeners
+ *
+ * @param message the event
+ * @return true if all notifications are performed with no error, false otherwise
+ */
+ protected boolean broadcast(String message) {
+ List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
+
+ boolean success = true;
+ for (TopicListener topicListener: snapshotListeners) {
+ try {
+ topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
+ } catch (Exception e) {
+ logger.warn("{}: notification error @ {} because of {}",
+ this, topicListener, e.getMessage(), e);
+ success = false;
+ }
+ }
+ return success;
+ }
+
+ /**
+ * take a snapshot of current topic listeners
+ *
+ * @return the topic listeners
+ */
+ protected synchronized List<TopicListener> snapshotTopicListeners() {
+ @SuppressWarnings("unchecked")
+ List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
+ return listeners;
+ }
+
+ @Override
+ public boolean lock() {
+
+ logger.info("{}: locking", this);
+
+ synchronized (this) {
+ if (this.locked)
+ return true;
+
+ this.locked = true;
+ }
+
+ return this.stop();
+ }
+
+ @Override
+ public boolean unlock() {
+ logger.info("{}: unlocking", this);
+
+ synchronized(this) {
+ if (!this.locked)
+ return true;
+
+ this.locked = false;
+ }
+
+ try {
+ return this.start();
+ } catch (Exception e) {
+ logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isLocked() {
+ return this.locked;
+ }
+
+ @Override
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return this.alive;
+ }
+
+ @Override
+ public List<String> getServers() {
+ return servers;
+ }
+
+ @Override
+ public synchronized String[] getRecentEvents() {
+ String[] events = new String[recentEvents.size()];
+ return recentEvents.toArray(events);
+ }
+
+
+ @Override
+ public String toString() {
+ return "TopicBase [servers=" + servers + ", topic=" + topic + ", #recentEvents=" + recentEvents.size() + ", locked="
+ + locked + ", #topicListeners=" + topicListeners.size() + "]";
+ }
+}