diff options
Diffstat (limited to 'policy-endpoints')
5 files changed, 30 insertions, 21 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java index c3d02d14..4c96f9be 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java @@ -282,12 +282,15 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { logger.warn("No topic for DMAAP Sink " + properties); return new ArrayList<DmaapTopicSink>(); } - List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<DmaapTopicSink>(); synchronized(this) { - List<DmaapTopicSink> dmaapTopicWriters = new ArrayList<DmaapTopicSink>(); for (String topic: writeTopicList) { - + if (this.dmaapTopicWriters.containsKey(topic)) { + newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); + continue; + } String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); @@ -410,9 +413,9 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts); - dmaapTopicWriters.add(dmaapTopicSink); + newDmaapTopicSinks.add(dmaapTopicSink); } - return dmaapTopicWriters; + return newDmaapTopicSinks; } } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java index 9f60556c..a54cb6f4 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java @@ -326,6 +326,10 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>(); synchronized(this) { for (String topic: readTopicList) { + if (this.dmaapTopicSources.containsKey(topic)) { + dmaapTopicSource_s.add(this.dmaapTopicSources.get(topic)); + continue; + } String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java index 432f035c..4dc38f18 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java @@ -171,8 +171,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { * {@inheritDoc} */ @Override - public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException { - + public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException { return this.build(servers, topic, null, null, null, true, false, false); } @@ -188,11 +187,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { logger.warn("No topic for UEB Sink " + properties); return new ArrayList<UebTopicSink>(); } - List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List<UebTopicSink> newUebTopicSinks = new ArrayList<UebTopicSink>(); synchronized(this) { - List<UebTopicSink> uebTopicWriters = new ArrayList<UebTopicSink>(); for (String topic: writeTopicList) { + if (this.uebTopicSinks.containsKey(topic)) { + newUebTopicSinks.add(this.uebTopicSinks.get(topic)); + continue; + } String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + @@ -243,9 +246,9 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed, useHttps, allowSelfSignedCerts); - uebTopicWriters.add(uebTopicWriter); + newUebTopicSinks.add(uebTopicWriter); } - return uebTopicWriters; + return newUebTopicSinks; } } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java index 1729576f..474d4a80 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java @@ -208,9 +208,13 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { } List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*"))); - List<UebTopicSource> uebTopicSources = new ArrayList<UebTopicSource>(); + List<UebTopicSource> newUebTopicSources = new ArrayList<UebTopicSource>(); synchronized(this) { for (String topic: readTopicList) { + if (this.uebTopicSources.containsKey(topic)) { + newUebTopicSources.add(this.uebTopicSources.get(topic)); + continue; + } String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + @@ -292,10 +296,10 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts); - uebTopicSources.add(uebTopicSource); + newUebTopicSources.add(uebTopicSource); } } - return uebTopicSources; + return newUebTopicSources; } /** diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java index a34d361b..6ea21575 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -179,14 +179,12 @@ public interface BusConsumer { * @param fetchLimit Fetch Limit * @throws MalformedURLException */ - @SuppressWarnings("unchecked") public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username, String password, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps) - - throws MalformedURLException { + int fetchTimeout, int fetchLimit, boolean useHttps) + throws MalformedURLException { this.fetchTimeout = fetchTimeout; @@ -201,9 +199,6 @@ public interface BusConsumer { this.consumer.setUsername(username); this.consumer.setPassword(password); - - - } /** |