From f14578b9c5d9a98cceb5342d69b0326289b1690c Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Sat, 27 Jul 2019 08:20:17 -0400 Subject: Add addTopic(List) New services are typically configured using TopicParameters. These are converted into Properties before being passed to TopicEndpoint.addTopic(), which then converts them into BusTopicParams before configuring a given topic. Added TopicEndpoint.addTopic(List), which makes it possible to configure the topics without going into the intermediate Properties. Furthermore, because TopicParams is a subclass of BusTopicParams, no conversion is needed for that either, so the TopicParams can be passed directly into the configuration classes. Incorporated changes from review. Change-Id: Id87e2c6812e36ae1a3ac680e6b35208667971782 Issue-ID: POLICY-1953 Signed-off-by: Jim Hahn --- .../common/endpoints/event/comm/TopicEndpoint.java | 34 ++++++++- .../endpoints/event/comm/TopicEndpointProxy.java | 84 +++++++++++++++++++--- .../event/comm/bus/NoopTopicSourceFactory.java | 11 ++- .../endpoints/event/comm/bus/TopicBaseFactory.java | 9 +++ .../event/comm/bus/TopicBaseHashedFactory.java | 9 +++ .../endpoints/parameters/TopicParameterGroup.java | 6 +- 6 files changed, 140 insertions(+), 13 deletions(-) (limited to 'policy-endpoints/src/main/java/org/onap') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index 7bc7abab..bb707523 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -30,6 +30,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.endpoints.parameters.TopicParameters; /** * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into @@ -46,24 +48,52 @@ public interface TopicEndpoint extends Startable, Lockable { */ List addTopics(Properties properties); + /** + * Add topics configuration (sources and sinks) into a single list. + * + * @param params parameters to configure topic + * @return topic list + * @throws IllegalArgumentException when invalid arguments are provided + */ + List addTopics(TopicParameterGroup params); + /** * Add Topic Sources to the communication infrastructure initialized per properties. * * @param properties properties for Topic Source construction - * @return a generic Topic Source + * @return a list of generic Topic Sources * @throws IllegalArgumentException when invalid arguments are provided */ List addTopicSources(Properties properties); + + /** + * Add Topic Sources to the communication infrastructure initialized per properties. + * + * @param paramList parameters for Topic Source construction + * @return a list of generic Topic Sources + * @throws IllegalArgumentException when invalid arguments are provided + */ + List addTopicSources(List paramList); + /** * Add Topic Sinks to the communication infrastructure initialized per properties. * * @param properties properties for Topic Sink construction - * @return a generic Topic Sink + * @return a list of generic Topic Sinks * @throws IllegalArgumentException when invalid arguments are provided */ List addTopicSinks(Properties properties); + /** + * Add Topic Sinks to the communication infrastructure initialized per properties. + * + * @param paramList parameters for Topic Sink construction + * @return a list of generic Topic Sinks + * @throws IllegalArgumentException when invalid arguments are provided + */ + List addTopicSinks(List paramList); + /** * Gets all Topic Sources. * diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index 00980fc4..c9e73152 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -34,6 +34,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +67,41 @@ class TopicEndpointProxy implements TopicEndpoint { return topics; } + @Override + public List addTopics(TopicParameterGroup params) { + List topics = new ArrayList<>(params.getTopicSinks().size() + params.getTopicSources().size()); + topics.addAll(addTopicSources(params.getTopicSources())); + topics.addAll(addTopicSinks(params.getTopicSinks())); + return topics; + } + + @Override + public List addTopicSources(List paramList) { + List sources = new ArrayList<>(paramList.size()); + + for (TopicParameters param : paramList) { + switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { + case UEB: + sources.add(UebTopicFactories.getSourceFactory().build(param)); + break; + case DMAAP: + sources.add(DmaapTopicFactories.getSourceFactory().build(param)); + break; + case NOOP: + sources.add(NoopTopicFactories.getSourceFactory().build(param)); + break; + default: + logger.debug("Unknown source type {} for topic: {}", param.getTopicCommInfrastructure(), + param.getTopic()); + break; + } + } + + lockSources(sources); + + return sources; + } + @Override public List addTopicSources(Properties properties) { @@ -78,13 +115,42 @@ class TopicEndpointProxy implements TopicEndpoint { sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties)); sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); + lockSources(sources); + + return sources; + } + + private void lockSources(List sources) { if (this.isLocked()) { - for (final TopicSource source : sources) { - source.lock(); + sources.forEach(TopicSource::lock); + } + } + + @Override + public List addTopicSinks(List paramList) { + List sinks = new ArrayList<>(paramList.size()); + + for (TopicParameters param : paramList) { + switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { + case UEB: + sinks.add(UebTopicFactories.getSinkFactory().build(param)); + break; + case DMAAP: + sinks.add(DmaapTopicFactories.getSinkFactory().build(param)); + break; + case NOOP: + sinks.add(NoopTopicFactories.getSinkFactory().build(param)); + break; + default: + logger.debug("Unknown sink type {} for topic: {}", param.getTopicCommInfrastructure(), + param.getTopic()); + break; } } - return sources; + lockSinks(sinks); + + return sinks; } @Override @@ -99,15 +165,17 @@ class TopicEndpointProxy implements TopicEndpoint { sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties)); sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); - if (this.isLocked()) { - for (final TopicSink sink : sinks) { - sink.lock(); - } - } + lockSinks(sinks); return sinks; } + private void lockSinks(List sinks) { + if (this.isLocked()) { + sinks.forEach(TopicSink::lock); + } + } + @Override public List getTopicSources() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java index 9623b4fa..7164f919 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSourceFactory.java @@ -21,6 +21,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.List; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; /** @@ -36,6 +37,14 @@ public class NoopTopicSourceFactory extends NoopTopicFactory { return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS; } + /** + * {@inheritDoc}. + */ + @Override + public NoopTopicSource build(BusTopicParams param) { + return build(param.getServers(), param.getTopic()); + } + /** * {@inheritDoc}. */ @@ -49,6 +58,6 @@ public class NoopTopicSourceFactory extends NoopTopicFactory { */ @Override public String toString() { - return "NoopTopicSourceFactory[" + super.toString() + "]"; + return "NoopTopicSourceFactory [" + super.toString() + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java index 897a8e19..a6b5b1d9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseFactory.java @@ -23,6 +23,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.List; import java.util.Properties; import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; /** * Topic Base Factory. @@ -49,6 +50,14 @@ public interface TopicBaseFactory { */ T build(List servers, String topic, boolean managed); + /** + * Construct an instance of an endpoint. + * + * @param param parameters + * @return an instance of T. + */ + T build(BusTopicParams param); + /** * destroy TopicBase instance. * @param topic topic. diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java index f958bd01..38ee25a4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; /** * Topic Factory implementation that indexes T instances in a hash table. @@ -99,6 +100,14 @@ public abstract class TopicBaseHashedFactory implements TopicBa return newEndpoints; } + /** + * {@inheritDoc}. + */ + @Override + public T build(BusTopicParams param) { + return this.build(param.getServers(), param.getTopic(), param.isManaged()); + } + /** * {@inheritDoc}. */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java index 70a06036..f633b0ef 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,7 @@ package org.onap.policy.common.endpoints.parameters; +import java.util.LinkedList; import java.util.List; import lombok.Getter; import lombok.Setter; @@ -41,8 +43,8 @@ import org.onap.policy.common.parameters.annotations.NotNull; @Setter public class TopicParameterGroup extends ParameterGroupImpl { - private List topicSources; - private List topicSinks; + private final List topicSources = new LinkedList<>(); + private final List topicSinks = new LinkedList<>(); public TopicParameterGroup() { super(TopicParameterGroup.class.getSimpleName()); -- cgit 1.2.3-korg