summaryrefslogtreecommitdiffstats
path: root/policy-endpoints
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints')
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java13
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java4
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java15
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java10
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java9
5 files changed, 30 insertions, 21 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
index c3d02d14..4c96f9be 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
@@ -282,12 +282,15 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
logger.warn("No topic for DMAAP Sink " + properties);
return new ArrayList<DmaapTopicSink>();
}
- List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
+ List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
+ List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<DmaapTopicSink>();
synchronized(this) {
- List<DmaapTopicSink> dmaapTopicWriters = new ArrayList<DmaapTopicSink>();
for (String topic: writeTopicList) {
-
+ if (this.dmaapTopicWriters.containsKey(topic)) {
+ newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
+ continue;
+ }
String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." +
topic +
PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
@@ -410,9 +413,9 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
dme2Partner, dme2Latitude, dme2Longitude,
dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
- dmaapTopicWriters.add(dmaapTopicSink);
+ newDmaapTopicSinks.add(dmaapTopicSink);
}
- return dmaapTopicWriters;
+ return newDmaapTopicSinks;
}
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
index 9f60556c..a54cb6f4 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
@@ -326,6 +326,10 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>();
synchronized(this) {
for (String topic: readTopicList) {
+ if (this.dmaapTopicSources.containsKey(topic)) {
+ dmaapTopicSource_s.add(this.dmaapTopicSources.get(topic));
+ continue;
+ }
String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." +
topic +
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
index 432f035c..4dc38f18 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
@@ -171,8 +171,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
* {@inheritDoc}
*/
@Override
- public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
-
+ public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
return this.build(servers, topic, null, null, null, true, false, false);
}
@@ -188,11 +187,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
logger.warn("No topic for UEB Sink " + properties);
return new ArrayList<UebTopicSink>();
}
- List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
+ List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
+ List<UebTopicSink> newUebTopicSinks = new ArrayList<UebTopicSink>();
synchronized(this) {
- List<UebTopicSink> uebTopicWriters = new ArrayList<UebTopicSink>();
for (String topic: writeTopicList) {
+ if (this.uebTopicSinks.containsKey(topic)) {
+ newUebTopicSinks.add(this.uebTopicSinks.get(topic));
+ continue;
+ }
String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." +
topic +
@@ -243,9 +246,9 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
UebTopicSink uebTopicWriter = this.build(serverList, topic,
apiKey, apiSecret,
partitionKey, managed, useHttps, allowSelfSignedCerts);
- uebTopicWriters.add(uebTopicWriter);
+ newUebTopicSinks.add(uebTopicWriter);
}
- return uebTopicWriters;
+ return newUebTopicSinks;
}
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
index 1729576f..474d4a80 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
@@ -208,9 +208,13 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
}
List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));
- List<UebTopicSource> uebTopicSources = new ArrayList<UebTopicSource>();
+ List<UebTopicSource> newUebTopicSources = new ArrayList<UebTopicSource>();
synchronized(this) {
for (String topic: readTopicList) {
+ if (this.uebTopicSources.containsKey(topic)) {
+ newUebTopicSources.add(this.uebTopicSources.get(topic));
+ continue;
+ }
String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." +
topic +
@@ -292,10 +296,10 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
apiKey, apiSecret,
consumerGroup, consumerInstance,
fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
- uebTopicSources.add(uebTopicSource);
+ newUebTopicSources.add(uebTopicSource);
}
}
- return uebTopicSources;
+ return newUebTopicSources;
}
/**
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
index a34d361b..6ea21575 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -179,14 +179,12 @@ public interface BusConsumer {
* @param fetchLimit Fetch Limit
* @throws MalformedURLException
*/
- @SuppressWarnings("unchecked")
public DmaapConsumerWrapper(List<String> servers, String topic,
String apiKey, String apiSecret,
String username, String password,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps)
-
- throws MalformedURLException {
+ int fetchTimeout, int fetchLimit, boolean useHttps)
+ throws MalformedURLException {
this.fetchTimeout = fetchTimeout;
@@ -201,9 +199,6 @@ public interface BusConsumer {
this.consumer.setUsername(username);
this.consumer.setPassword(password);
-
-
-
}
/**