diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java | 84 |
1 files changed, 76 insertions, 8 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index 00980fc4..c9e73152 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -34,6 +34,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +68,41 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override + public List<Topic> addTopics(TopicParameterGroup params) { + List<Topic> topics = new ArrayList<>(params.getTopicSinks().size() + params.getTopicSources().size()); + topics.addAll(addTopicSources(params.getTopicSources())); + topics.addAll(addTopicSinks(params.getTopicSinks())); + return topics; + } + + @Override + public List<TopicSource> addTopicSources(List<TopicParameters> paramList) { + List<TopicSource> sources = new ArrayList<>(paramList.size()); + + for (TopicParameters param : paramList) { + switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { + case UEB: + sources.add(UebTopicFactories.getSourceFactory().build(param)); + break; + case DMAAP: + sources.add(DmaapTopicFactories.getSourceFactory().build(param)); + break; + case NOOP: + sources.add(NoopTopicFactories.getSourceFactory().build(param)); + break; + default: + logger.debug("Unknown source type {} for topic: {}", param.getTopicCommInfrastructure(), + param.getTopic()); + break; + } + } + + lockSources(sources); + + return sources; + } + + @Override public List<TopicSource> addTopicSources(Properties properties) { // 1. Create UEB Sources @@ -78,13 +115,42 @@ class TopicEndpointProxy implements TopicEndpoint { sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties)); sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); + lockSources(sources); + + return sources; + } + + private void lockSources(List<TopicSource> sources) { if (this.isLocked()) { - for (final TopicSource source : sources) { - source.lock(); + sources.forEach(TopicSource::lock); + } + } + + @Override + public List<TopicSink> addTopicSinks(List<TopicParameters> paramList) { + List<TopicSink> sinks = new ArrayList<>(paramList.size()); + + for (TopicParameters param : paramList) { + switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { + case UEB: + sinks.add(UebTopicFactories.getSinkFactory().build(param)); + break; + case DMAAP: + sinks.add(DmaapTopicFactories.getSinkFactory().build(param)); + break; + case NOOP: + sinks.add(NoopTopicFactories.getSinkFactory().build(param)); + break; + default: + logger.debug("Unknown sink type {} for topic: {}", param.getTopicCommInfrastructure(), + param.getTopic()); + break; } } - return sources; + lockSinks(sinks); + + return sinks; } @Override @@ -99,15 +165,17 @@ class TopicEndpointProxy implements TopicEndpoint { sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties)); sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); - if (this.isLocked()) { - for (final TopicSink sink : sinks) { - sink.lock(); - } - } + lockSinks(sinks); return sinks; } + private void lockSinks(List<TopicSink> sinks) { + if (this.isLocked()) { + sinks.forEach(TopicSink::lock); + } + } + @Override public List<TopicSource> getTopicSources() { |