From 9f8be335925299059992bd46285a1b633d518051 Mon Sep 17 00:00:00 2001 From: mmis Date: Mon, 30 Jul 2018 17:04:50 +0100 Subject: Copy policy-endpoints from drools-pdp to common Removed changes made in commit b40acf2d244058c162a8597968e59f2708e6abf4 that went beyond the scope of POLICY-967 Issue-ID: POLICY-967 Change-Id: Ibbf78540dec8bf8601a62dacc8c7056d43f70ba1 Signed-off-by: mmis --- .../event/comm/bus/NoopTopicSinkFactory.java | 149 +++++++++++++++++++++ 1 file changed, 149 insertions(+) (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java index c555d94f..ee1672d7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java @@ -20,9 +20,16 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Properties; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Noop Topic Sink Factory */ @@ -80,3 +87,145 @@ public interface NoopTopicSinkFactory { */ public void destroy(); } + + +/* ------------- implementation ----------------- */ + +/** + * Factory of noop sinks + */ +class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + + /** + * noop topic sinks map + */ + protected HashMap noopTopicSinks = new HashMap<>(); + + @Override + public List build(Properties properties) { + + final String sinkTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS); + if (sinkTopics == null || sinkTopics.isEmpty()) { + logger.info("{}: no topic for noop sink", this); + return new ArrayList<>(); + } + + final List sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); + final List newSinks = new ArrayList<>(); + synchronized (this) { + for (final String topic : sinkTopicList) { + if (this.noopTopicSinks.containsKey(topic)) { + newSinks.add(this.noopTopicSinks.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + servers = "noop"; + } + + final List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + final NoopTopicSink noopSink = this.build(serverList, topic, managed); + newSinks.add(noopSink); + } + return newSinks; + } + } + + @Override + public NoopTopicSink build(List servers, String topic, boolean managed) { + + List noopSinkServers = servers; + if (noopSinkServers == null) { + noopSinkServers = new ArrayList<>(); + } + + if (noopSinkServers.isEmpty()) { + noopSinkServers.add("noop"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } + + final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic); + + if (managed) { + this.noopTopicSinks.put(topic, sink); + } + + return sink; + } + } + + @Override + public void destroy(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + NoopTopicSink noopSink; + synchronized (this) { + if (!this.noopTopicSinks.containsKey(topic)) { + return; + } + + noopSink = this.noopTopicSinks.remove(topic); + } + + noopSink.shutdown(); + } + + @Override + public void destroy() { + final List sinks = this.inventory(); + for (final NoopTopicSink sink : sinks) { + sink.shutdown(); + } + + synchronized (this) { + this.noopTopicSinks.clear(); + } + } + + @Override + public NoopTopicSink get(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + @Override + public List inventory() { + return new ArrayList<>(this.noopTopicSinks.values()); + } +} -- cgit 1.2.3-korg