diff options
Diffstat (limited to 'models-sim/policy-models-simulators/src/main')
-rw-r--r-- | models-sim/policy-models-simulators/src/main/java/org/onap/policy/models/simulators/Main.java | 65 |
1 files changed, 36 insertions, 29 deletions
diff --git a/models-sim/policy-models-simulators/src/main/java/org/onap/policy/models/simulators/Main.java b/models-sim/policy-models-simulators/src/main/java/org/onap/policy/models/simulators/Main.java index 8333800f3..a0b165564 100644 --- a/models-sim/policy-models-simulators/src/main/java/org/onap/policy/models/simulators/Main.java +++ b/models-sim/policy-models-simulators/src/main/java/org/onap/policy/models/simulators/Main.java @@ -22,7 +22,9 @@ package org.onap.policy.models.simulators; import java.io.FileNotFoundException; import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; import lombok.AccessLevel; @@ -85,9 +87,8 @@ public class Main extends ServiceManagerContainer { AtomicReference<DmaapSimProvider> provRef = new AtomicReference<>(); addAction(provName, () -> provRef.set(buildDmaapProvider(dmaapProv)), () -> provRef.get().shutdown()); - // @formatter:off - // REST server simulators + // @formatter:off for (ClassRestServerParameters restsim : params.getRestServers()) { AtomicReference<HttpServletServer> ref = new AtomicReference<>(); addAction(restsim.getName(), @@ -98,23 +99,30 @@ public class Main extends ServiceManagerContainer { // NOTE: topics must be started AFTER the (dmaap) rest servers // topic sinks - AtomicReference<List<TopicSink>> sinkRef = new AtomicReference<>(); - addAction("topic sinks", () -> sinkRef.set(buildSinks(params.getTopicSinks())), - () -> shutdownSinks(sinkRef.get())); + Map<String, TopicSink> sinks = new HashMap<>(); + for (TopicParameters topicParams : params.getTopicSinks()) { + String topic = topicParams.getTopic(); + addAction("Sink " + topic, + () -> sinks.put(topic, startSink(topicParams)), + () -> sinks.get(topic).shutdown()); + } // topic sources - AtomicReference<List<TopicSource>> sourceRef = new AtomicReference<>(); - addAction("topic sources", () -> sourceRef.set(buildSources(params.getTopicSources())), - () -> shutdownSources(sourceRef.get())); + Map<String, TopicSource> sources = new HashMap<>(); + for (TopicParameters topicParams : params.getTopicSources()) { + String topic = topicParams.getTopic(); + addAction("Source " + topic, + () -> sources.put(topic, startSource(topicParams)), + () -> sources.get(topic).shutdown()); + } // topic server simulators for (TopicServerParameters topicsim : params.getTopicServers()) { AtomicReference<TopicServer<?>> ref = new AtomicReference<>(); addAction(topicsim.getName(), - () -> ref.set(buildTopicServer(topicsim, sinkRef.get(), sourceRef.get())), + () -> ref.set(buildTopicServer(topicsim, sinks, sources)), () -> ref.get().shutdown()); } - // @formatter:on } @@ -164,20 +172,16 @@ public class Main extends ServiceManagerContainer { return prov; } - protected List<TopicSink> buildSinks(List<TopicParameters> params) { - return TopicEndpointManager.getManager().addTopicSinks(params); + private TopicSink startSink(TopicParameters params) { + TopicSink sink = TopicEndpointManager.getManager().addTopicSinks(List.of(params)).get(0); + sink.start(); + return sink; } - private void shutdownSinks(List<TopicSink> sinks) { - sinks.forEach(TopicSink::shutdown); - } - - protected List<TopicSource> buildSources(List<TopicParameters> params) { - return TopicEndpointManager.getManager().addTopicSources(params); - } - - private void shutdownSources(List<TopicSource> sources) { - sources.forEach(TopicSource::shutdown); + private TopicSource startSource(TopicParameters params) { + TopicSource source = TopicEndpointManager.getManager().addTopicSources(List.of(params)).get(0); + source.start(); + return source; } private HttpServletServer buildRestServer(String dmaapName, ClassRestServerParameters params) { @@ -201,17 +205,20 @@ public class Main extends ServiceManagerContainer { } } - private TopicServer<?> buildTopicServer(TopicServerParameters params, List<TopicSink> sinks, - List<TopicSource> sources) { + private TopicServer<?> buildTopicServer(TopicServerParameters params, Map<String, TopicSink> sinks, + Map<String, TopicSource> sources) { try { // find the desired sink - TopicSink sink = sinks.stream().filter(sink2 -> sink2.getTopic().equals(params.getSink())).findAny() - .orElseThrow(() -> new IllegalArgumentException("invalid sink topic " + params.getSink())); + TopicSink sink = sinks.get(params.getSink()); + if (sink == null) { + throw new IllegalArgumentException("invalid sink topic " + params.getSink()); + } // find the desired source - TopicSource source = sources.stream().filter(source2 -> source2.getTopic().equals(params.getSource())) - .findAny().orElseThrow(() -> new IllegalArgumentException( - "invalid source topic " + params.getSource())); + TopicSource source = sources.get(params.getSource()); + if (source == null) { + throw new IllegalArgumentException("invalid source topic " + params.getSource()); + } // create the topic server return (TopicServer<?>) Class.forName(params.getProviderClass()) |