diff options
Diffstat (limited to 'policy-endpoints/src/main')
6 files changed, 140 insertions, 13 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index 7bc7abab..bb707523 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -30,6 +30,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.endpoints.parameters.TopicParameters; /** * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into @@ -47,24 +49,52 @@ public interface TopicEndpoint extends Startable, Lockable { List<Topic> addTopics(Properties properties); /** + * Add topics configuration (sources and sinks) into a single list. + * + * @param params parameters to configure topic + * @return topic list + * @throws IllegalArgumentException when invalid arguments are provided + */ + List<Topic> addTopics(TopicParameterGroup params); + + /** * Add Topic Sources to the communication infrastructure initialized per properties. * * @param properties properties for Topic Source construction - * @return a generic Topic Source + * @return a list of generic Topic Sources * @throws IllegalArgumentException when invalid arguments are provided */ List<TopicSource> addTopicSources(Properties properties); + + /** + * Add Topic Sources to the communication infrastructure initialized per properties. + * + * @param paramList parameters for Topic Source construction + * @return a list of generic Topic Sources + * @throws IllegalArgumentException when invalid arguments are provided + */ + List<TopicSource> addTopicSources(List<TopicParameters> paramList); + /** * Add Topic Sinks to the communication infrastructure initialized per properties. * * @param properties properties for Topic Sink construction - * @return a generic Topic Sink + * @return a list of generic Topic Sinks * @throws IllegalArgumentException when invalid arguments are provided */ List<TopicSink> addTopicSinks(Properties properties); /** + * Add Topic Sinks to the communication infrastructure initialized per properties. + * + * @param paramList parameters for Topic Sink construction + * @return a list of generic Topic Sinks + * @throws IllegalArgumentException when invalid arguments are provided + */ + List<TopicSink> addTopicSinks(List<TopicParameters> paramList); + + /** * Gets all Topic Sources. * * @return the Topic Source List diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index 00980fc4..c9e73152 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -34,6 +34,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +68,41 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override + public List<Topic> addTopics(TopicParameterGroup params) { + List<Topic> topics = new ArrayList<>(params.getTopicSinks().size() + params.getTopicSources().size()); + topics.addAll(addTopicSources(params.getTopicSources())); + topics.addAll(addTopicSinks(params.getTopicSinks())); + return topics; + } + + @Override + public List<TopicSource> addTopicSources(List<TopicParameters> paramList) { + List<TopicSource> sources = new ArrayList<>(paramList.size()); + + for (TopicParameters param : paramList) { + switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { + case UEB: + sources.add(UebTopicFactories.getSourceFactory().build(param)); + break; + case DMAAP: + sources.add(DmaapTopicFactories.getSourceFactory().build(param)); + break; + case NOOP: + sources.add(NoopTopicFactories.getSourceFactory().build(param)); + break; + default: + logger.debug("Unknown source type {} for topic: {}", param.getTopicCommInfrastructure(), + param.getTopic()); + break; + } + } + + lockSources(sources); + + return sources; + } + + @Override public List<TopicSource> addTopicSources(Properties properties) { // 1. Create UEB Sources @@ -78,13 +115,42 @@ class TopicEndpointProxy implements TopicEndpoint { sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties)); sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); + lockSources(sources); + + return sources; + } + + private void lockSources(List<TopicSource> sources) { if (this.isLocked()) { - for (final TopicSource source : sources) { - source.lock(); + sources.forEach(TopicSource::lock); + } + } + + @Override + public List<TopicSink> addTopicSinks(List<TopicParameters> paramList) { + List<TopicSink> sinks = new ArrayList<>(paramList.size()); + + for (TopicParameters param : paramList) { + switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { + case UEB: + sinks.add(UebTopicFactories.getSinkFactory().build(param)); + break; + case DMAAP: + sinks.add(DmaapTopicFactories.getSinkFactory().build(param)); + break; + case NOOP: + sinks.add(NoopTopicFactories.getSinkFactory().build(param)); + break; + default: + logger.debug("Unknown sink type {} for topic: {}", param.getTopicCommInfrastructure(), + param.getTopic()); + break; } } - return sources; + lockSinks(sinks); + + return sinks; } @Override @@ -99,15 +165,17 @@ class TopicEndpointProxy implements TopicEndpoint { sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties)); sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); - if (this.isLocked()) { - for (final TopicSink sink : sinks) { - sink.lock(); - } - } + lockSinks(sinks); return sinks; } + private void lockSinks(List<TopicSink> sinks) { + if (this.isLocked()) { + sinks.forEach(TopicSink::lock); + } + } + @Override public List<TopicSource> getTopicSources() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java index 9623b4fa..7164f919 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java @@ -21,6 +21,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.List; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; /** @@ -40,6 +41,14 @@ public class NoopTopicSourceFactory extends NoopTopicFactory<NoopTopicSource> { * {@inheritDoc}. */ @Override + public NoopTopicSource build(BusTopicParams param) { + return build(param.getServers(), param.getTopic()); + } + + /** + * {@inheritDoc}. + */ + @Override protected NoopTopicSource build(List<String> servers, String topic) { return new NoopTopicSource(servers, topic); } @@ -49,6 +58,6 @@ public class NoopTopicSourceFactory extends NoopTopicFactory<NoopTopicSource> { */ @Override public String toString() { - return "NoopTopicSourceFactory[" + super.toString() + "]"; + return "NoopTopicSourceFactory [" + super.toString() + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java index 897a8e19..a6b5b1d9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java @@ -23,6 +23,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.List; import java.util.Properties; import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; /** * Topic Base Factory. @@ -50,6 +51,14 @@ public interface TopicBaseFactory<T extends Topic> { T build(List<String> servers, String topic, boolean managed); /** + * Construct an instance of an endpoint. + * + * @param param parameters + * @return an instance of T. + */ + T build(BusTopicParams param); + + /** * destroy TopicBase instance. * @param topic topic. */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java index f958bd01..38ee25a4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; /** * Topic Factory implementation that indexes T instances in a hash table. @@ -103,6 +104,14 @@ public abstract class TopicBaseHashedFactory<T extends Topic> implements TopicBa * {@inheritDoc}. */ @Override + public T build(BusTopicParams param) { + return this.build(param.getServers(), param.getTopic(), param.isManaged()); + } + + /** + * {@inheritDoc}. + */ + @Override public T build(List<String> servers, String topic, boolean managed) { if (servers == null || servers.isEmpty()) { throw new IllegalArgumentException(MISSING_SERVERS_MESSAGE); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java index b89f47dc..eab9c12f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,7 @@ package org.onap.policy.common.endpoints.parameters; +import java.util.LinkedList; import java.util.List; import lombok.Getter; import lombok.Setter; @@ -41,8 +43,8 @@ import org.onap.policy.common.parameters.annotations.NotNull; @Setter public class TopicParameterGroup extends ParameterGroupImpl { - private List<TopicParameters> topicSources; - private List<TopicParameters> topicSinks; + private final List<TopicParameters> topicSources = new LinkedList<>(); + private final List<TopicParameters> topicSinks = new LinkedList<>(); public TopicParameterGroup() { super(TopicParameterGroup.class.getSimpleName()); |