diff options
author | Pamela Dragosh <pdragosh@research.att.com> | 2019-10-07 23:59:30 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-10-07 23:59:30 +0000 |
commit | 3b44a9887ab7ba8d0ba4f1159e2fc0a93c0247d0 (patch) | |
tree | 52e909b5acc1d725b956416f4e4121ff5e7b99bf | |
parent | 1d0d9ebabda67d6c770b4854a8154763aa6e75d6 (diff) | |
parent | aa148d9b5bba6ad23736e939a6d0ec917e761e1e (diff) |
Merge "Flesh out DMaaP simulator"
41 files changed, 2972 insertions, 416 deletions
diff --git a/models-interactions/model-simulators/pom.xml b/models-interactions/model-simulators/pom.xml index b0c48ebda..7d64200e9 100644 --- a/models-interactions/model-simulators/pom.xml +++ b/models-interactions/model-simulators/pom.xml @@ -73,5 +73,10 @@ <artifactId>policy-models-decisions</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.onap.policy.models.sim</groupId> + <artifactId>policy-models-sim-dmaap</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </project> diff --git a/models-interactions/model-simulators/src/main/java/org/onap/policy/simulators/Util.java b/models-interactions/model-simulators/src/main/java/org/onap/policy/simulators/Util.java index a1d28ba23..6c1a05753 100644 --- a/models-interactions/model-simulators/src/main/java/org/onap/policy/simulators/Util.java +++ b/models-interactions/model-simulators/src/main/java/org/onap/policy/simulators/Util.java @@ -22,11 +22,18 @@ package org.onap.policy.simulators; import java.io.IOException; - +import java.util.Properties; import org.onap.policy.common.endpoints.http.server.HttpServletServer; import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.gson.GsonMessageBodyHandler; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.common.utils.network.NetworkUtil; +import org.onap.policy.common.utils.resources.ResourceUtils; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; +import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider; +import org.onap.policy.models.sim.dmaap.rest.DmaapSimRestServer; public class Util { public static final String AAISIM_SERVER_NAME = "aaiSim"; @@ -40,6 +47,7 @@ public class Util { public static final int VFCSIM_SERVER_PORT = 6668; public static final int GUARDSIM_SERVER_PORT = 6669; public static final int SDNCSIM_SERVER_PORT = 6670; + public static final int DMAAPSIM_SERVER_PORT = 3904; private static final String CANNOT_CONNECT = "cannot connect to port "; private static final String LOCALHOST = "localhost"; @@ -139,4 +147,34 @@ public class Util { } return testServer; } + + /** + * Build a DMaaP simulator. + * + * @return the simulator + * @throws InterruptedException if a thread is interrupted + * @throws IOException if an IO errror occurs + * @throws CoderException if the server parameters cannot be loaded + */ + public static HttpServletServer buildDmaapSim() throws InterruptedException, IOException, CoderException { + String json = ResourceUtils.getResourceAsString("org/onap/policy/simulators/dmaap/DmaapParameters.json"); + DmaapSimParameterGroup params = new StandardCoder().decode(json, DmaapSimParameterGroup.class); + + DmaapSimProvider.setInstance(new DmaapSimProvider(params)); + + Properties props = DmaapSimRestServer.getServerProperties(params.getRestServerParameters()); + + final String svcpfx = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + params.getRestServerParameters().getName(); + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, + Integer.toString(DMAAPSIM_SERVER_PORT)); + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "true"); + + HttpServletServer testServer = HttpServletServerFactoryInstance.getServerFactory().build(props).get(0); + testServer.waitedStart(5000); + if (!NetworkUtil.isTcpPortOpen(LOCALHOST, testServer.getPort(), 50, 1000L)) { + throw new IllegalStateException(CANNOT_CONNECT + testServer.getPort()); + } + return testServer; + } } diff --git a/models-interactions/model-simulators/src/main/resources/org/onap/policy/simulators/dmaap/DmaapParameters.json b/models-interactions/model-simulators/src/main/resources/org/onap/policy/simulators/dmaap/DmaapParameters.json new file mode 100644 index 000000000..b704f6f14 --- /dev/null +++ b/models-interactions/model-simulators/src/main/resources/org/onap/policy/simulators/dmaap/DmaapParameters.json @@ -0,0 +1,8 @@ +{ + "name": "DMaapSim", + "topicSweepSec": 300, + "restServerParameters": { + "host": "0.0.0.0", + "port": 3904 + } +} diff --git a/models-interactions/model-simulators/src/test/java/org/onap/policy/simulators/DmaapSimulatorTest.java b/models-interactions/model-simulators/src/test/java/org/onap/policy/simulators/DmaapSimulatorTest.java new file mode 100644 index 000000000..50e9bad5b --- /dev/null +++ b/models-interactions/model-simulators/src/test/java/org/onap/policy/simulators/DmaapSimulatorTest.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.simulators; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; +import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.utils.coder.StandardCoder; + +public class DmaapSimulatorTest { + private static final int MAX_WAIT_SEC = 2; + private static final String TOPIC = "MY-TOPIC"; + + /** + * Messages from the topic are placed here by the endpoint. + */ + private BlockingQueue<String> queue; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TopicEndpointManager.getManager().shutdown(); + } + + /** + * Starts the simulator and the topic. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + assertNotNull(Util.buildDmaapSim()); + + String topicJson = new String(Files.readAllBytes( + new File("src/test/resources/org/onap/policy/simulators/dmaap/TopicParameters.json").toPath()), + StandardCharsets.UTF_8); + topicJson = topicJson.replace("${port}", String.valueOf(Util.DMAAPSIM_SERVER_PORT)); + + TopicParameterGroup topicConfig = new StandardCoder().decode(topicJson, TopicParameterGroup.class); + + TopicEndpointManager.getManager().addTopics(topicConfig); + TopicEndpointManager.getManager().start(); + + queue = new LinkedBlockingQueue<>(); + } + + @After + public void tearDown() { + TopicEndpointManager.getManager().shutdown(); + HttpServletServerFactoryInstance.getServerFactory().destroy(); + } + + @Test + public void test() throws InterruptedException { + TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC) + .register((infra, topic, event) -> queue.add(event)); + + DmaapTopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC); + sink.send("hello"); + sink.send("world"); + + assertEquals("hello", queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + assertEquals("world", queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + } +} diff --git a/models-interactions/model-simulators/src/test/resources/org/onap/policy/simulators/dmaap/TopicParameters.json b/models-interactions/model-simulators/src/test/resources/org/onap/policy/simulators/dmaap/TopicParameters.json new file mode 100644 index 000000000..ba1f4806f --- /dev/null +++ b/models-interactions/model-simulators/src/test/resources/org/onap/policy/simulators/dmaap/TopicParameters.json @@ -0,0 +1,21 @@ +{ + "topicSources": [ + { + "topic": "MY-TOPIC", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap", + "fetchTimeout": 100 + } + ], + "topicSinks": [ + { + "topic": "MY-TOPIC", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap" + } + ] +}
\ No newline at end of file diff --git a/models-sim/models-sim-dmaap/pom.xml b/models-sim/models-sim-dmaap/pom.xml index c1981e6be..e3f0f0b27 100644 --- a/models-sim/models-sim-dmaap/pom.xml +++ b/models-sim/models-sim-dmaap/pom.xml @@ -1,6 +1,7 @@ <!-- ============LICENSE_START======================================================= Copyright (C) 2019 Nordix Foundation. + Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -56,6 +57,17 @@ <artifactId>gson</artifactId> <version>${policy.common.version}</version> </dependency> + <dependency> + <groupId>org.onap.policy.common</groupId> + <artifactId>utils-test</artifactId> + <version>${policy.common.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java index caae287b8..11da57582 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +22,8 @@ package org.onap.policy.models.sim.dmaap.parameters; import lombok.Getter; - import org.onap.policy.common.parameters.ParameterGroupImpl; +import org.onap.policy.common.parameters.annotations.Min; import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; @@ -36,6 +37,14 @@ public class DmaapSimParameterGroup extends ParameterGroupImpl { private RestServerParameters restServerParameters; /** + * Frequency, in seconds, with which to sweep the topics of idle consumers. On each + * sweep cycle, if a consumer group has had no new poll requests since the last sweep + * cycle, it is removed. + */ + @Min(1) + private long topicSweepSec; + + /** * Create the DMaaP simulator parameter group. * * @param name the parameter group name diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java index 8eb76ec00..252054504 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +39,7 @@ public class DmaapSimParameterHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimParameterHandler.class); - private static final Coder CODER = new StandardCoder(); + private final Coder coder = new StandardCoder(); /** * Read the parameters from the parameter file. @@ -54,7 +55,7 @@ public class DmaapSimParameterHandler { try { // Read the parameters from JSON File file = new File(arguments.getFullConfigurationFilePath()); - dmaapSimParameterGroup = CODER.decode(file, DmaapSimParameterGroup.class); + dmaapSimParameterGroup = coder.decode(file, DmaapSimParameterGroup.class); } catch (final CoderException e) { final String errorMessage = "error reading parameters from \"" + arguments.getConfigurationFilePath() + "\"\n" + "(" + e.getClass().getSimpleName() + "):" + e.getMessage(); diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java index c7269f66d..8fb2cffa3 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +22,14 @@ package org.onap.policy.models.sim.dmaap.parameters; import lombok.Getter; + import org.onap.policy.common.parameters.ParameterGroupImpl; import org.onap.policy.common.parameters.annotations.Min; import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; /** - * Class to hold all parameters needed for DMaaP simulator rest server. + * Class to hold all parameters needed for rest server. */ @NotNull @NotBlank @@ -39,6 +41,6 @@ public class RestServerParameters extends ParameterGroupImpl { private int port; public RestServerParameters() { - super("RestServerParameters"); + super(RestServerParameters.class.getSimpleName()); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java new file mode 100644 index 000000000..737151339 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java @@ -0,0 +1,190 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Data associated with a consumer group. All consumer instances within a group share the + * same data object. + */ +public class ConsumerGroupData { + private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupData.class); + + /** + * Returned when messages can no longer be read from this consumer group object, + * because it is being removed from the topic. {@link #UNREADABLE_LIST} must not be + * the same list as Collections.emptyList(), thus we wrap it. + */ + public static final List<String> UNREADABLE_LIST = Collections.unmodifiableList(Collections.emptyList()); + + /** + * Returned when there are no messages read. Collections.emptyList() is already + * unmodifiable, thus no need to wrap it. + */ + private static final List<String> EMPTY_LIST = Collections.emptyList(); + + /** + * This is locked while fields other than {@link #messageQueue} are updated. + */ + private final Object lockit = new Object(); + + /** + * Number of sweep cycles that have occurred since a consumer has attempted to read + * from the queue. This consumer group should be removed once this count exceeds + * {@code 1}, provided {@link #nreaders} is zero. + */ + private int nsweeps = 0; + + /** + * Number of consumers that are currently attempting to read from the queue. This + * consumer group should not be removed as long as this is non-zero. + */ + private int nreaders = 0; + + /** + * Message queue. + */ + private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(); + + + /** + * Constructs the object. + * + * @param topicName name of the topic with which this object is associated + * @param groupName name of the consumer group with which this object is associated + */ + public ConsumerGroupData(String topicName, String groupName) { + logger.info("Topic {}: add consumer group: {}", topicName, groupName); + } + + /** + * Determines if this consumer group should be removed. This should be invoked once + * during each sweep cycle. When this returns {@code true}, this consumer group should + * be immediately discarded, as any readers will sit in a spin loop waiting for it to + * be discarded. + * + * @return {@code true} if this consumer group should be removed, {@code false} + * otherwise + */ + public boolean shouldRemove() { + synchronized (lockit) { + return (nreaders == 0 && ++nsweeps > 1); + } + } + + /** + * Reads messages from the queue, blocking if necessary. + * + * @param maxRead maximum number of messages to read + * @param waitMs time to wait, in milliseconds, if the queue is currently empty + * @return a list of messages read from the queue, empty if no messages became + * available before the wait time elapsed, or {@link #UNREADABLE_LIST} if this + * consumer group object is no longer active + * @throws InterruptedException if this thread was interrupted while waiting for the + * first message + */ + public List<String> read(int maxRead, long waitMs) throws InterruptedException { + + synchronized (lockit) { + if (nsweeps > 1 && nreaders == 0) { + // cannot use this consumer group object anymore + return UNREADABLE_LIST; + } + + ++nreaders; + } + + /* + * Note: do EVERYTHING inside of the "try" block, so that the "finally" block can + * update the reader count. + * + * Do NOT hold the lockit while we're polling, as poll() may block for a while. + */ + try { + // always read at least one message + int maxRead2 = Math.max(1, maxRead); + long waitMs2 = Math.max(0, waitMs); + + // perform a blocking read of the queue + String obj = getNextMessage(waitMs2); + if (obj == null) { + return EMPTY_LIST; + } + + /* + * List should hold all messages from the queue PLUS the one we already have. + * Note: it's possible for additional messages to be added to the queue while + * we're reading from it. In that case, the list will grow as needed. + */ + List<String> lst = new ArrayList<>(Math.min(maxRead2, messageQueue.size() + 1)); + lst.add(obj); + + // perform NON-blocking read of subsequent messages + for (int x = 1; x < maxRead2; ++x) { + if ((obj = messageQueue.poll()) == null) { + break; + } + + lst.add(obj); + } + + return lst; + + } finally { + synchronized (lockit) { + --nreaders; + nsweeps = 0; + } + } + } + + /** + * Writes messages to the queue. + * + * @param messages messages to be written to the queue + */ + public void write(List<String> messages) { + messageQueue.addAll(messages); + } + + // the following methods may be overridden by junit tests + + /** + * Gets the next message from the queue. + * + * @param waitMs time to wait, in milliseconds, if the queue is currently empty + * @return the next message, or {@code null} if no messages became available before + * the wait time elapsed + * @throws InterruptedException if this thread was interrupted while waiting for the + * message + */ + protected String getNextMessage(long waitMs) throws InterruptedException { + return messageQueue.poll(waitMs, TimeUnit.MILLISECONDS); + } +} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java index 9de29cdac..d11d1b397 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,18 +21,20 @@ package org.onap.policy.models.sim.dmaap.provider; +import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.tuple.MutablePair; -import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; -import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; +import javax.ws.rs.core.Response.Status; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,50 +43,70 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@est.tech) */ -public class DmaapSimProvider { +public class DmaapSimProvider extends ServiceManagerContainer { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class); - // Recurring string constants - private static final String TOPIC_TAG = "Topic:"; + @Getter + @Setter + private static DmaapSimProvider instance; - // Time for a get to wait before checking of a message has come - private static final long DMAAP_SIM_WAIT_TIME = 50; + /** + * Maps a topic name to its data. + */ + private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>(); - // recurring constants - private static final String WITH_TIMEOUT = " with timeout "; + /** + * Thread used to remove idle consumers from the topics. + */ + private ScheduledExecutorService timerPool; - // The map of topic messages - private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>(); - // The map of topic messages - private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap = - new LinkedHashMap<>(); + /** + * Constructs the object. + * + * @param params parameters + */ + public DmaapSimProvider(DmaapSimParameterGroup params) { + addAction("Topic Sweeper", () -> { + timerPool = makeTimerPool(); + timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(), + TimeUnit.SECONDS); + }, () -> timerPool.shutdown()); + } /** * Process a DMaaP message. * - * @param topicName The topic name + * @param topicName the topic name * @param dmaapMessage the message to process * @return a response to the message */ + @SuppressWarnings("unchecked") public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) { - LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage); - - synchronized (topicMessageMap) { - SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName); - if (messageMap == null) { - messageMap = new TreeMap<>(); - topicMessageMap.put(topicName, messageMap); - LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map"); - } + LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage); - int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1); + List<Object> lst; - messageMap.put(nextKey, dmaapMessage); - LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage); + if (dmaapMessage instanceof List) { + lst = (List<Object>) dmaapMessage; + } else { + lst = Collections.singletonList(dmaapMessage); } - return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build(); + TopicData topic = topic2data.get(topicName); + + /* + * Write all messages and return the count. If the topic doesn't exist yet, then + * there are no subscribers to receive the messages, thus treat it as if all + * messages were published. + */ + int nmessages = (topic != null ? topic.write(lst) : lst.size()); + + Map<String, Object> map = new LinkedHashMap<>(); + map.put("serverTimeMs", 0); + map.put("count", nmessages); + + return Response.status(Response.Status.OK).entity(map).build(); } /** @@ -92,102 +115,66 @@ public class DmaapSimProvider { * @param topicName The topic to wait on * @param consumerGroup the consumer group that is waiting * @param consumerId the consumer ID that is waiting - * @param timeout the length of time to wait for + * @param limit the maximum number of messages to get + * @param timeoutMs the length of time to wait for * @return the DMaaP message or */ public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId, - final int timeout) { + final int limit, final long timeoutMs) { - LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout); + LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup, + consumerId, limit, timeoutMs); - MutablePair<Integer, String> consumerGroupPair = null; - - synchronized (consumerGroupsMap) { - Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName); - if (consumerGroupMap == null) { - consumerGroupMap = new LinkedHashMap<>(); - consumerGroupsMap.put(topicName, consumerGroupMap); - LOGGER.trace( - TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup); - } - - consumerGroupPair = consumerGroupMap.get(consumerGroup); - if (consumerGroupPair == null) { - consumerGroupPair = new MutablePair<>(-1, consumerId); - consumerGroupMap.put(consumerGroup, consumerGroupPair); - LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group " - + consumerGroup + ":" + consumerId); - } - } + try { + List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit, + timeoutMs); - long timeOfTimeout = System.currentTimeMillis() + timeout; + if (lst.isEmpty() && timeoutMs > 0) { + LOGGER.debug("Topic: {}, Timed out waiting for messages: {}: {}", topicName, consumerGroup, consumerId); + return Response.status(Status.REQUEST_TIMEOUT).entity(lst).build(); - do { - Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair); - if (waitingMessages != null) { - LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages); - return Response.status(Response.Status.OK).entity(waitingMessages).build(); + } else { + LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, consumerGroup, lst.size(), + consumerId); + return Response.status(Status.OK).entity(lst).build(); } - try { - TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME); - } catch (InterruptedException ie) { - String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID " - + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout; - LOGGER.warn(errorMessage, ie); - Thread.currentThread().interrupt(); - throw new DmaapSimRuntimeException(errorMessage, ie); - } + } catch (InterruptedException e) { + LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup, + consumerId, e); + Thread.currentThread().interrupt(); + return Response.status(Status.GONE).entity(Collections.emptyList()).build(); } - while (timeOfTimeout > System.currentTimeMillis()); - - LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout); - return Response.status(Response.Status.REQUEST_TIMEOUT).build(); } /** - * Return any messages on this topic with a message number greater than the supplied message number. - * - * @param topicName the topic name to check - * @param consumerGroupPair the pair with the information on the last message retrieved - * @return the messages or null if there are none + * Task to remove idle consumers from each topic. */ - private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) { - String foundMessageList = "["; - - synchronized (topicMessageMap) { - SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName); - if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) { - return null; - } + private class SweeperTask implements Runnable { + @Override + public void run() { + topic2data.values().forEach(TopicData::removeIdleConsumers); + } + } - boolean first = true; - for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) { - if (first) { - first = false; - } else { - foundMessageList += ","; - } - try { - foundMessageList += new StandardCoder().encode(dmaapMessage); - } catch (CoderException ce) { - String errorMessage = "Encoding error on message on DMaaP topic " + topicName; - LOGGER.warn(errorMessage, ce); - return null; - } - } - foundMessageList += ']'; + // the following methods may be overridden by junit tests - LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft() - + " to " + messageMap.lastKey()); - synchronized (consumerGroupsMap) { - consumerGroupPair.setLeft(messageMap.lastKey()); - } - } + /** + * Makes a new timer pool. + * + * @return a new timer pool + */ + protected ScheduledExecutorService makeTimerPool() { + return Executors.newScheduledThreadPool(1); + } - return (foundMessageList.length() < 3 ? null : foundMessageList); + /** + * Makes a new topic. + * + * @param topicName topic name + * @return a new topic + */ + protected TopicData makeTopicData(String topicName) { + return new TopicData(topicName); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java new file mode 100644 index 000000000..2737f455b --- /dev/null +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java @@ -0,0 +1,201 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Data associated with a topic. + * + * <p/> + * Note: for ease of implementation, this adds a topic when a consumer polls it rather + * than when a publisher writes to it. This is the opposite of how the real DMaaP works. + * As a result, this will never return a topic-not-found message to the consumer. + */ +public class TopicData { + private static final Logger logger = LoggerFactory.getLogger(TopicData.class); + + /** + * Name of the topic with which this data is associated. + */ + private final String topicName; + + /** + * Maps a consumer group name to its associated data. + */ + private final Map<String, ConsumerGroupData> group2data = new ConcurrentHashMap<>(); + + + /** + * Constructs the object. + * + * @param topicName name of the topic with which this object is associated + */ + public TopicData(String topicName) { + logger.info("Topic {}: added", topicName); + this.topicName = topicName; + } + + /** + * Removes idle consumers from the topic. This is typically called once during each + * sweep cycle. + */ + public void removeIdleConsumers() { + Iterator<Entry<String, ConsumerGroupData>> iter = group2data.entrySet().iterator(); + while (iter.hasNext()) { + Entry<String, ConsumerGroupData> ent = iter.next(); + if (ent.getValue().shouldRemove()) { + /* + * We want the minimum amount of time to elapse between invoking + * shouldRemove() and iter.remove(), thus all other statements (e.g., + * logging) should be done AFTER iter.remove(). + */ + iter.remove(); + + logger.info("Topic {}: removed consumer group: {}", topicName, ent.getKey()); + } + } + } + + /** + * Reads from a particular consumer group's queue. + * + * @param consumerGroup name of the consumer group from which to read + * @param maxRead maximum number of messages to read + * @param waitMs time to wait, in milliseconds, if the queue is currently empty + * @return a list of messages read from the queue, empty if no messages became + * available before the wait time elapsed + * @throws InterruptedException if this thread was interrupted while waiting for the + * first message + */ + public List<String> read(String consumerGroup, int maxRead, long waitMs) throws InterruptedException { + /* + * It's possible that this thread may spin several times while waiting for + * removeIdleConsumers() to complete its call to iter.remove(), thus we create + * this closure once, rather than each time through the loop. + */ + Function<String, ConsumerGroupData> maker = this::makeData; + + // loop until we get a readable list + List<String> result; + + // @formatter:off + + do { + result = group2data.computeIfAbsent(consumerGroup, maker).read(maxRead, waitMs); + } + while (result == ConsumerGroupData.UNREADABLE_LIST); + + // @formatter:on + + return result; + } + + /** + * Writes messages to the queues of every consumer group. + * + * @param messages messages to be written to the queues + * @return the number of messages enqueued + */ + public int write(List<Object> messages) { + List<String> list = convertMessagesToStrings(messages); + + /* + * We don't care if a consumer group is deleted from the map while we're adding + * messages to it, as those messages will simply be ignored (and discarded by the + * garbage collector). + */ + for (ConsumerGroupData data : group2data.values()) { + data.write(list); + } + + return list.size(); + } + + /** + * Converts a list of message objects to a list of message strings. If a message + * cannot be converted for some reason, then it is not added to the result list, thus + * the result list may be shorted than the original input list. + * + * @param messages objects to be converted + * @return a list of message strings + */ + protected List<String> convertMessagesToStrings(List<Object> messages) { + Coder coder = new StandardCoder(); + List<String> list = new ArrayList<>(messages.size()); + + for (Object msg : messages) { + String str = convertMessageToString(msg, coder); + if (str != null) { + list.add(str); + } + } + + return list; + } + + /** + * Converts a message object to a message string. + * + * @param message message to be converted + * @param coder used to encode the message as a string + * @return the message string, or {@code null} if it cannot be converted + */ + protected String convertMessageToString(Object message, Coder coder) { + if (message == null) { + return null; + } + + if (message instanceof String) { + return message.toString(); + } + + try { + return coder.encode(message); + } catch (CoderException e) { + logger.warn("cannot encode {}", message, e); + return null; + } + } + + // this may be overridden by junit tests + + /** + * Makes data for a consumer group. + * + * @param consumerGroup name of the consumer group to make + * @return new consumer group data + */ + protected ConsumerGroupData makeData(String consumerGroup) { + return new ConsumerGroupData(topicName, consumerGroup); + } +} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java index e269ac00b..dbd9422ef 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java @@ -1,66 +1,178 @@ -/* - * ============LICENSE_START======================================================= ONAP - * ================================================================================ Copyright (C) 2019 AT&T Intellectual - * Property. All rights reserved. ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.policy.models.sim.dmaap.rest; import java.io.BufferedReader; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.Reader; import java.lang.annotation.Annotation; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; import javax.ws.rs.Consumes; -import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.ext.MessageBodyReader; import javax.ws.rs.ext.Provider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; /** - * Provider that serializes and de-serializes JSON via gson. + * Provider that decodes "application/cambria" messages. */ @Provider @Consumes(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA) -@Produces(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA) public class CambriaMessageBodyHandler implements MessageBodyReader<Object> { - // Media type for Cambria public static final String MEDIA_TYPE_APPLICATION_CAMBRIA = "application/cambria"; - public static final Logger logger = LoggerFactory.getLogger(CambriaMessageBodyHandler.class); + /** + * Maximum length of a message or partition. + */ + private static final int MAX_LEN = 10000000; + + /** + * Maximum digits in a length field. + */ + private static final int MAX_DIGITS = 10; @Override public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { - return MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString()); + return (mediaType != null && MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString())); } @Override - public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType, - MultivaluedMap<String, String> httpHeaders, InputStream entityStream) - throws IOException { - - String cambriaString = ""; - try (BufferedReader bufferedReader = new BufferedReader( - new InputStreamReader(entityStream))) { - String line; - while ((line = bufferedReader.readLine()) != null) { - cambriaString += line; + public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException { + + try (BufferedReader bufferedReader = + new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) { + List<Object> messages = new LinkedList<>(); + String msg; + while ((msg = readMessage(bufferedReader)) != null) { + messages.add(msg); + } + + return messages; + } + } + + /** + * Reads a message. + * + * @param reader source from which to read + * @return the message that was read, or {@code null} if there are no more messages + * @throws IOException if an error occurs + */ + private String readMessage(Reader reader) throws IOException { + if (!skipWhitespace(reader)) { + return null; + } + + int partlen = readLength(reader); + if (partlen > MAX_LEN) { + throw new IOException("invalid partition length"); + } + + int msglen = readLength(reader); + if (msglen > MAX_LEN) { + throw new IOException("invalid message length"); + } + + // skip over the partition + reader.skip(partlen); + + return readString(reader, msglen); + } + + /** + * Skips whitespace. + * + * @param reader source from which to read + * @return {@code true} if there is another character after the whitespace, + * {@code false} if the end of the stream has been reached + * @throws IOException if an error occurs + */ + private boolean skipWhitespace(Reader reader) throws IOException { + int chr; + + do { + reader.mark(1); + if ((chr = reader.read()) < 0) { + return false; + } + } + while (Character.isWhitespace(chr)); + + // push the last character back onto the reader + reader.reset(); + + return true; + } + + /** + * Reads a length field, which is a number followed by ".". + * + * @param reader source from which to read + * @return the length, or -1 if EOF has been reached + * @throws IOException if an error occurs + */ + private int readLength(Reader reader) throws IOException { + StringBuilder bldr = new StringBuilder(MAX_DIGITS); + + int chr; + for (int x = 0; x < MAX_DIGITS; ++x) { + if ((chr = reader.read()) < 0) { + throw new EOFException("missing '.' in 'length' field"); } - return cambriaString.substring(cambriaString.indexOf('{'), cambriaString.length()); + if (chr == '.') { + String text = bldr.toString().trim(); + return (text.isEmpty() ? 0 : Integer.valueOf(text)); + } + + if (!Character.isDigit(chr)) { + throw new IOException("invalid character in 'length' field"); + } + + bldr.append((char) chr); } + + throw new IOException("too many digits in 'length' field"); + } + + /** + * Reads a string. + * + * @param reader source from which to read + * @param len length of the string (i.e., number of characters to read) + * @return the string that was read + * @throws IOException if an error occurs + */ + private String readString(Reader reader, int len) throws IOException { + char[] buf = new char[len]; + IOUtils.readFully(reader, buf); + + return new String(buf); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java index e3fdd4884..8339d0e64 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,24 +21,24 @@ package org.onap.policy.models.sim.dmaap.rest; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import io.swagger.annotations.Authorization; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; - import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider; /** * Class to provide REST endpoints for DMaaP simulator component statistics. */ @Path("/events") +@Produces(DmaapSimRestControllerV1.MEDIA_TYPE_APPLICATION_JSON) public class DmaapSimRestControllerV1 extends BaseRestControllerV1 { + public static final String MEDIA_TYPE_APPLICATION_JSON = "application/json"; /** * Get a DMaaP message. @@ -45,72 +46,33 @@ public class DmaapSimRestControllerV1 extends BaseRestControllerV1 { * @param topicName topic to get message from * @param consumerGroup consumer group that is getting the message * @param consumerId consumer ID that is getting the message - * @param timeout timeout for the message + * @param timeoutMs timeout for the message * @return the message */ @GET @Path("{topicName}/{consumerGroup}/{consumerId}") - // @formatter:off - @ApiOperation( - value = "Get a DMaaP event on a topic", - notes = "Returns an event on a DMaaP topic", - response = Object.class, - authorizations = - @Authorization(value = AUTHORIZATION_TYPE) - ) - @ApiResponses( - value = { - @ApiResponse( - code = AUTHENTICATION_ERROR_CODE, - message = AUTHENTICATION_ERROR_MESSAGE), - @ApiResponse( - code = AUTHORIZATION_ERROR_CODE, - message = AUTHORIZATION_ERROR_MESSAGE), - @ApiResponse( - code = SERVER_ERROR_CODE, - message = SERVER_ERROR_MESSAGE) - } - ) - // @formatter:on - public Response getDmaaapMessage(@PathParam("topicName") final String topicName, - @PathParam("consumerGroup") final String consumerGroup, @PathParam("consumerId") final String consumerId, - @QueryParam("timeout") final int timeout) { + public Response getDmaapMessage(@PathParam("topicName") final String topicName, + @PathParam("consumerGroup") final String consumerGroup, + @PathParam("consumerId") final String consumerId, + @QueryParam("limit") @DefaultValue("1") final int limit, + @QueryParam("timeout") @DefaultValue("15000") final long timeoutMs) { - return new DmaapSimProvider().processDmaapMessageGet(topicName, consumerGroup, consumerId, timeout); + return DmaapSimProvider.getInstance().processDmaapMessageGet(topicName, consumerGroup, consumerId, limit, + timeoutMs); } /** * Post a DMaaP message. * - * @param topicName topic to get message from415 + * @param topicName topic to get message from * @return the response to the post */ @POST @Path("{topicName}") - // @formatter:off - @ApiOperation( - value = "Post a DMaaP event on a topic", - notes = "Returns an event on a DMaaP topic", - response = Response.class, - authorizations = - @Authorization(value = AUTHORIZATION_TYPE) - ) - @ApiResponses( - value = { - @ApiResponse( - code = AUTHENTICATION_ERROR_CODE, - message = AUTHENTICATION_ERROR_MESSAGE), - @ApiResponse( - code = AUTHORIZATION_ERROR_CODE, - message = AUTHORIZATION_ERROR_MESSAGE), - @ApiResponse( - code = SERVER_ERROR_CODE, - message = SERVER_ERROR_MESSAGE) - } - ) - // @formatter:on - public Response postDmaaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) { + @Consumes(value = {CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA, + TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN, MEDIA_TYPE_APPLICATION_JSON}) + public Response postDmaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) { - return new DmaapSimProvider().processDmaapMessagePut(topicName, dmaapMessage); + return DmaapSimProvider.getInstance().processDmaapMessagePut(topicName, dmaapMessage); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java index 28de42c21..b05a0fe1a 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java @@ -21,28 +21,21 @@ package org.onap.policy.models.sim.dmaap.rest; -import java.util.ArrayList; import java.util.List; import java.util.Properties; - -import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.http.server.HttpServletServer; import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.gson.GsonMessageBodyHandler; +import org.onap.policy.common.utils.services.ServiceManagerContainer; import org.onap.policy.models.sim.dmaap.parameters.RestServerParameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Class to manage life cycle of DMaaP Simulator rest server. */ -public class DmaapSimRestServer implements Startable { - - private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimRestServer.class); +public class DmaapSimRestServer extends ServiceManagerContainer { - private List<HttpServletServer> servers = new ArrayList<>(); - - private RestServerParameters restServerParameters; + private final List<HttpServletServer> servers; /** * Constructor for instantiating DmaapSimRestServer. @@ -50,91 +43,40 @@ public class DmaapSimRestServer implements Startable { * @param restServerParameters the rest server parameters */ public DmaapSimRestServer(final RestServerParameters restServerParameters) { - this.restServerParameters = restServerParameters; - } + this.servers = HttpServletServerFactoryInstance.getServerFactory() + .build(getServerProperties(restServerParameters)); - /** - * {@inheritDoc}. - */ - @Override - public boolean start() { - try { - servers = HttpServletServerFactoryInstance.getServerFactory().build(getServerProperties()); - for (final HttpServletServer server : servers) { - server.start(); - } - } catch (final Exception exp) { - LOGGER.error("Failed to start DMaaP simulator http server", exp); - return false; + for (HttpServletServer server : this.servers) { + addAction("REST " + server.getName(), server::start, server::stop); } - return true; } /** - * Creates the server properties object using restServerParameters. + * Creates a set of properties, suitable for building a REST server, from the + * parameters. * - * @return the properties object + * @param restServerParameters parameters from which to build the properties + * @return a set of properties representing the given parameters */ - private Properties getServerProperties() { + public static Properties getServerProperties(RestServerParameters restServerParameters) { final Properties props = new Properties(); props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, restServerParameters.getName()); final String svcpfx = - PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName(); + PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName(); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, restServerParameters.getHost()); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, - Integer.toString(restServerParameters.getPort())); + Integer.toString(restServerParameters.getPort())); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, - String.join(",", DmaapSimRestControllerV1.class.getName())); + DmaapSimRestControllerV1.class.getName()); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); - props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "true"); + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "false"); + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, - CambriaMessageBodyHandler.class.getName() + "," + JsonMessageBodyHandler.class.getName()); + String.join(",", CambriaMessageBodyHandler.class.getName(), + GsonMessageBodyHandler.class.getName(), + TextMessageBodyHandler.class.getName())); return props; } - - /** - * {@inheritDoc}. - */ - @Override - public boolean stop() { - for (final HttpServletServer server : servers) { - try { - server.stop(); - } catch (final Exception exp) { - LOGGER.error("Failed to stop DMaaP simulator http server", exp); - } - } - return true; - } - - /** - * {@inheritDoc}. - */ - @Override - public void shutdown() { - stop(); - } - - /** - * {@inheritDoc}. - */ - @Override - public boolean isAlive() { - return !servers.isEmpty(); - } - - /** - * {@inheritDoc}. - */ - @Override - public String toString() { - final StringBuilder builder = new StringBuilder(); - builder.append("DmaapSimRestServer [servers="); - builder.append(servers); - builder.append("]"); - return builder.toString(); - } - } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java deleted file mode 100644 index a3eebda00..000000000 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * ============LICENSE_START======================================================= ONAP - * ================================================================================ Copyright (C) 2019 AT&T Intellectual - * Property. All rights reserved. ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.models.sim.dmaap.rest; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.lang.annotation.Annotation; -import java.lang.reflect.Type; -import javax.ws.rs.Consumes; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyReader; -import javax.ws.rs.ext.Provider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provider that serializes and de-serializes JSON via gson. - */ -@Provider -@Consumes(MediaType.APPLICATION_JSON) -@Produces(MediaType.APPLICATION_JSON) -public class JsonMessageBodyHandler implements MessageBodyReader<Object> { - public static final Logger logger = LoggerFactory.getLogger(JsonMessageBodyHandler.class); - - @Override - public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { - return MediaType.APPLICATION_JSON.equals(mediaType.toString()); - } - - @Override - public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType, - MultivaluedMap<String, String> httpHeaders, InputStream entityStream) - throws IOException { - - String jsonString = ""; - try (BufferedReader bufferedReader = new BufferedReader( - new InputStreamReader(entityStream))) { - String line; - while ((line = bufferedReader.readLine()) != null) { - jsonString += line; - } - - return jsonString; - } - } -} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java new file mode 100644 index 000000000..3c903c82b --- /dev/null +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java @@ -0,0 +1,66 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import javax.ws.rs.Consumes; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.Provider; + +/** + * Provider that decodes "text/plain" messages. + */ +@Provider +@Consumes(TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN) +public class TextMessageBodyHandler implements MessageBodyReader<Object> { + public static final String MEDIA_TYPE_TEXT_PLAIN = "text/plain"; + + @Override + public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { + return (mediaType != null && MEDIA_TYPE_TEXT_PLAIN.equals(mediaType.toString())); + } + + @Override + public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException { + + try (BufferedReader bufferedReader = + new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) { + List<Object> messages = new LinkedList<>(); + String msg; + while ((msg = bufferedReader.readLine()) != null) { + messages.add(msg); + } + + return messages; + } + } +} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java index 899c0e081..b9e0efaaf 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,19 +21,15 @@ package org.onap.policy.models.sim.dmaap.startstop; -import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.common.utils.services.ServiceManagerContainer; import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; +import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider; import org.onap.policy.models.sim.dmaap.rest.DmaapSimRestServer; /** * This class activates the DMaaP simulator as a complete service. */ public class DmaapSimActivator extends ServiceManagerContainer { - /** - * The DMaaP simulator REST API server. - */ - private DmaapSimRestServer restServer; /** * Instantiate the activator for the DMaaP simulator as a complete service. @@ -42,19 +39,11 @@ public class DmaapSimActivator extends ServiceManagerContainer { public DmaapSimActivator(final DmaapSimParameterGroup dmaapSimParameterGroup) { super("DMaaP Simulator"); - // @formatter:off - addAction("DMaaP Simulator parameters", - () -> ParameterService.register(dmaapSimParameterGroup), - () -> ParameterService.deregister(dmaapSimParameterGroup.getName())); - - addAction("Create REST server", - () -> restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters()), - () -> restServer = null - ); + DmaapSimProvider provider = new DmaapSimProvider(dmaapSimParameterGroup); + DmaapSimProvider.setInstance(provider); + addAction("Sim Provider", provider::start, provider::stop); - addAction("REST server", - () -> restServer.start(), - () -> restServer.stop()); - // @formatter:on + DmaapSimRestServer restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters()); + addAction("REST server", restServer::start, restServer::stop); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java index cf559f712..724c3dc35 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java @@ -26,13 +26,15 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.net.URL; import java.util.Arrays; - +import lombok.Getter; +import lombok.Setter; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.utils.resources.ResourceUtils; import org.onap.policy.models.sim.dmaap.DmaapSimException; import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; @@ -46,6 +48,9 @@ public class DmaapSimCommandLineArguments { private static final int HELP_LINE_LENGTH = 120; private final Options options; + + @Getter + @Setter private String configurationFilePath = null; /** @@ -145,7 +150,7 @@ public class DmaapSimCommandLineArguments { * @throws DmaapSimException on command argument validation errors */ public void validate() throws DmaapSimException { - validateReadableFile("DMaaP simulator configuration", configurationFilePath); + validateFileExists("DMaaP simulator configuration", configurationFilePath); } /** @@ -175,15 +180,6 @@ public class DmaapSimCommandLineArguments { } /** - * Gets the configuration file path. - * - * @return the configuration file path - */ - public String getConfigurationFilePath() { - return configurationFilePath; - } - - /** * Gets the full expanded configuration file path. * * @return the configuration file path @@ -193,16 +189,6 @@ public class DmaapSimCommandLineArguments { } /** - * Sets the configuration file path. - * - * @param configurationFilePath the configuration file path - */ - public void setConfigurationFilePath(final String configurationFilePath) { - this.configurationFilePath = configurationFilePath; - - } - - /** * Check set configuration file path. * * @return true, if check set configuration file path @@ -212,14 +198,14 @@ public class DmaapSimCommandLineArguments { } /** - * Validate readable file. + * Validate file exists. * * @param fileTag the file tag * @param fileName the file name * @throws DmaapSimException on the file name passed as a parameter */ - private void validateReadableFile(final String fileTag, final String fileName) throws DmaapSimException { - if (fileName == null || fileName.length() == 0) { + private void validateFileExists(final String fileTag, final String fileName) throws DmaapSimException { + if (StringUtils.isBlank(fileName)) { throw new DmaapSimException(fileTag + " file was not specified as an argument"); } @@ -233,11 +219,5 @@ public class DmaapSimCommandLineArguments { if (!theFile.exists()) { throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" does not exist"); } - if (!theFile.isFile()) { - throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is not a normal file"); - } - if (!theFile.canRead()) { - throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is ureadable"); - } } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java index 878d008a8..7b4f41b07 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +22,6 @@ package org.onap.policy.models.sim.dmaap.startstop; import java.util.Arrays; - -import org.onap.policy.common.utils.services.Registry; -import org.onap.policy.models.sim.dmaap.DmaapSimConstants; import org.onap.policy.models.sim.dmaap.DmaapSimException; import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler; @@ -75,14 +73,12 @@ public class Main { // Now, create the activator for the DMaaP Simulator service activator = new DmaapSimActivator(parameterGroup); - Registry.register(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR, activator); // Start the activator try { activator.start(); } catch (final RuntimeException e) { LOGGER.error("start of DMaaP simulator service failed, used parameters are {}", Arrays.toString(args), e); - Registry.unregister(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR); return; } @@ -110,7 +106,7 @@ public class Main { parameterGroup = null; // clear the DMaaP simulator activator - if (activator != null) { + if (activator != null && activator.isAlive()) { activator.stop(); } } @@ -128,8 +124,9 @@ public class Main { public void run() { try { // Shutdown the DMaaP simulator service and wait for everything to stop - activator.stop(); - } catch (final RuntimeException e) { + shutdown(); + + } catch (final RuntimeException | DmaapSimException e) { LOGGER.warn("error occured during shut down of the DMaaP simulator service", e); } } diff --git a/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json b/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json index dd2477a24..e936eb034 100644 --- a/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json +++ b/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json @@ -1,5 +1,6 @@ { "name": "DMaapSim", + "topicSweepSec": 900, "restServerParameters": { "host": "0.0.0.0", "port": 3904 diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java new file mode 100644 index 000000000..4e37a5e36 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.policy.common.utils.test.ExceptionsTester; +import org.onap.policy.models.sim.dmaap.DmaapSimException; +import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; + +public class DmaapSimXxxExceptionTest { + + @Test + public void testDmaapSimException() { + assertEquals(3, new ExceptionsTester().test(DmaapSimException.class)); + } + + @Test + public void testDmaapSimRuntimeException() { + assertEquals(3, new ExceptionsTester().test(DmaapSimRuntimeException.class)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java new file mode 100644 index 000000000..4513ffb82 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java @@ -0,0 +1,305 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ConsumerGroupDataTest { + private static final int WAIT_MS = 5000; + private static final int MIN_WAIT_MS = WAIT_MS / 2; + private static final String MY_TOPIC = "my-topic"; + private static final String MY_CONSUMER = "my-consumer"; + private static final String MSG1 = "hello"; + private static final String MSG2 = "there"; + private static final String MSG3 = "world"; + private static final int MAX_THREADS = 30; + + private MyData data; + private MyReader thread; + private List<MyReader> threads; + + /** + * Sets up. + */ + @Before + public void setUp() { + data = new MyData(); + thread = null; + threads = new ArrayList<>(MAX_THREADS); + } + + /** + * Stops any running thread. + */ + @After + public void tearDown() { + for (MyReader thr : threads) { + thr.interrupt(); + } + + for (MyReader thr : threads) { + thr.await(); + } + } + + @Test + public void testShouldRemove() throws InterruptedException { + assertFalse(data.shouldRemove()); + assertTrue(data.shouldRemove()); + + data = new MyData(); + + // start a reader thread and wait for it to poll its queue + startReader(0, 10); + assertTrue(data.await()); + + assertFalse(data.shouldRemove()); + } + + @Test + public void testRead() { + data.enqueue(MSG1, MSG2, MSG3, MSG1, MSG2, MSG3); + + // this reader only wants one + startReader(1, 1); + assertTrue(thread.await()); + assertEquals("[hello]", thread.result.toString()); + + // this reader wants three + startReader(3, 1); + assertTrue(thread.await()); + assertEquals("[there, world, hello]", thread.result.toString()); + + // this reader wants three, but will only get two + startReader(3, 1); + assertTrue(thread.await()); + assertEquals("[there, world]", thread.result.toString()); + } + + @Test + public void testRead_Idle() throws InterruptedException { + // force it to idle + data.shouldRemove(); + data.shouldRemove(); + + long tbeg = System.currentTimeMillis(); + assertSame(ConsumerGroupData.UNREADABLE_LIST, data.read(1, WAIT_MS)); + + // should not have waited + assertTrue(System.currentTimeMillis() < tbeg + MIN_WAIT_MS); + } + + @Test + public void testRead_NegativeCount() throws InterruptedException { + data.enqueue(MSG1, MSG2); + startReader(-1, 3); + assertTrue(data.await()); + + // wait time should be unaffected + assertEquals(3L, data.waitMs2); + + assertTrue(thread.await()); + + // should only return one message + assertEquals("[hello]", thread.result.toString()); + } + + @Test + public void testRead_NegativeWait() throws InterruptedException { + data.enqueue(MSG1, MSG2, MSG3); + startReader(2, -3); + assertTrue(data.await()); + + assertEquals(0L, data.waitMs2); + + assertTrue(thread.await()); + + // should return two messages, as requested + assertEquals("[hello, there]", thread.result.toString()); + } + + @Test + public void testRead_NoMessages() throws InterruptedException { + startReader(0, 0); + assertTrue(data.await()); + + assertTrue(thread.await()); + assertTrue(thread.result.isEmpty()); + } + + @Test + public void testRead_MultiThreaded() { + // queue up a bunch of messages + final int expected = MAX_THREADS * 3; + for (int x = 0; x < expected; ++x) { + data.enqueue(MSG1); + } + + for (int x = 0; x < MAX_THREADS; ++x) { + startReader(4, 1); + } + + int actual = 0; + for (MyReader thr : threads) { + thr.await(); + actual += thr.result.size(); + } + + assertEquals(expected, actual); + } + + + /** + * Starts a reader thread. + * + * @param limit number of messages to read at one time + * @param waitMs wait time, in milliseconds + */ + private void startReader(int limit, long waitMs) { + thread = new MyReader(limit, waitMs); + + thread.setDaemon(true); + thread.start(); + + threads.add(thread); + } + + + private class MyData extends ConsumerGroupData { + + /** + * Decremented when {@link #getNextMessage(long)} is invoked. + */ + private final CountDownLatch latch = new CountDownLatch(1); + + /** + * Messages to be added to the queue when {@link #getNextMessage(long)} is + * invoked. + */ + private final List<String> messages = new ArrayList<>(); + + /** + * Value passed to {@link #getNextMessage(long)}. + */ + private volatile long waitMs2 = -1; + + /** + * Constructs the object. + */ + public MyData() { + super(MY_TOPIC, MY_CONSUMER); + } + + /** + * Arranges for messages to be injected into the queue the next time + * {@link #getNextMessage(long)} is invoked. + * + * @param messages the messages to be injected + */ + public void enqueue(String... messages) { + this.messages.addAll(Arrays.asList(messages)); + } + + @Override + protected String getNextMessage(long waitMs) throws InterruptedException { + waitMs2 = waitMs; + + latch.countDown(); + + synchronized (messages) { + write(messages); + messages.clear(); + } + + return super.getNextMessage(waitMs); + } + + /** + * Waits for {@link #getNextMessage(long)} to be invoked. + * + * @return {@code true} if {@link #getNextMessage(long)} was invoked, + * {@code false} if the timer expired first + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public boolean await() throws InterruptedException { + return latch.await(WAIT_MS, TimeUnit.MILLISECONDS); + } + } + + /** + * Thread that will invoke the consumer group's read() method one time. + */ + private class MyReader extends Thread { + private final ConsumerGroupData group = data; + private final int limit; + private final long waitMs; + + /** + * Result returned by the read() method. + */ + private List<String> result = Collections.emptyList(); + + public MyReader(int limit, long waitMs) { + this.limit = limit; + this.waitMs = waitMs; + } + + @Override + public void run() { + try { + result = group.read(limit, waitMs); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Waits for the thread to complete. + * + * @return {@code true} if the thread completed, {@code false} if the thread is + * still running + */ + public boolean await() { + try { + this.join(WAIT_MS); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return !this.isAlive(); + } + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java new file mode 100644 index 000000000..f8c141614 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java @@ -0,0 +1,287 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; + +public class DmaapSimProviderTest { + private static final String EXPECTED_EXCEPTION = "expected exception"; + private static final long SWEEP_SEC = 10L; + private static final String TOPIC1 = "topic-A"; + private static final String TOPIC2 = "topic-B"; + private static final String CONSUMER1 = "consumer-X"; + private static final String CONSUMER_ID1 = "id1"; + + private MyProvider prov; + + @Mock + private DmaapSimParameterGroup params; + + @Mock + private ScheduledExecutorService timer; + + @Mock + private TopicData data1; + + @Mock + private TopicData data2; + + @Captor + private ArgumentCaptor<List<Object>> listCaptor; + + /** + * Sets up. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC); + + prov = new MyProvider(params); + } + + /** + * Shuts down the provider, if it's running. + */ + @After + public void tearDown() { + if (prov.isAlive()) { + prov.shutdown(); + } + } + + /** + * Verifies that the constructor adds all of the expected actions to the service + * manager container. + */ + @Test + public void testDmaapSimProvider() { + prov.start(); + verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS)); + + prov.stop(); + verify(timer).shutdown(); + } + + @Test + public void testProcessDmaapMessagePut_List() throws CoderException { + prov = spy(new MyProvider(params)); + + when(data1.write(any())).thenReturn(2); + + // force topics to exist + prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0); + prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0); + + List<Object> lst = Arrays.asList("hello", "world"); + Response resp = prov.processDmaapMessagePut(TOPIC1, lst); + assertEquals(Status.OK.getStatusCode(), resp.getStatus()); + StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class); + assertEquals("2", sco.getString("count")); + + List<Object> lst2 = Arrays.asList("helloB", "worldB"); + prov.processDmaapMessagePut(TOPIC1, lst2); + prov.processDmaapMessagePut(TOPIC2, lst2); + + // should only invoke this once for each topic + verify(prov).makeTopicData(TOPIC1); + verify(prov).makeTopicData(TOPIC2); + + // should process all writes + verify(data1).write(lst); + verify(data1).write(lst2); + + verify(data2).write(lst2); + } + + @Test + public void testProcessDmaapMessagePut_Single() throws CoderException { + prov = spy(new MyProvider(params)); + + // force topics to exist + prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0); + prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0); + + final String value1 = "abc"; + Response resp = prov.processDmaapMessagePut(TOPIC1, value1); + assertEquals(Status.OK.getStatusCode(), resp.getStatus()); + + // ensure that the response can be decoded + new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class); + + final String value2 = "def"; + prov.processDmaapMessagePut(TOPIC1, value2); + prov.processDmaapMessagePut(TOPIC2, value2); + + // should only invoke this once for each topic + verify(prov).makeTopicData(TOPIC1); + verify(prov).makeTopicData(TOPIC2); + + // should process all writes as singleton lists + listCaptor.getAllValues().clear(); + verify(data1, times(2)).write(listCaptor.capture()); + assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0)); + assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1)); + + listCaptor.getAllValues().clear(); + verify(data2).write(listCaptor.capture()); + assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(0)); + } + + @Test + public void testProcessDmaapMessageGet() throws InterruptedException { + List<String> msgs = Arrays.asList("400", "500"); + when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs); + + Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L); + assertEquals(Status.OK.getStatusCode(), resp.getStatus()); + assertEquals(msgs.toString(), resp.getEntity().toString()); + } + + @Test + public void testProcessDmaapMessageGet_Timeout() throws InterruptedException { + when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList()); + + Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L); + assertEquals(Status.REQUEST_TIMEOUT.getStatusCode(), resp.getStatus()); + assertEquals("[]", resp.getEntity().toString()); + } + + @Test + public void testProcessDmaapMessageGet_Ex() throws InterruptedException { + BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>(); + + // put in a background thread so it doesn't interrupt the tester thread + new Thread(() -> { + try { + when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION)); + respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).start(); + + Response resp = respQueue.poll(3, TimeUnit.SECONDS); + assertNotNull(resp); + + assertEquals(Status.GONE.getStatusCode(), resp.getStatus()); + assertEquals("[]", resp.getEntity().toString()); + } + + @Test + public void testSweepTopicTaskRun() { + prov.start(); + prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0); + prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0); + + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class)); + + captor.getValue().run(); + verify(data1).removeIdleConsumers(); + verify(data2).removeIdleConsumers(); + + // run it again + captor.getValue().run(); + verify(data1, times(2)).removeIdleConsumers(); + verify(data2, times(2)).removeIdleConsumers(); + } + + @Test + public void testMakeTimerPool() { + // use a real provider so we can test the real makeTimer() method + DmaapSimProvider prov2 = new DmaapSimProvider(params); + prov2.start(); + prov2.stop(); + } + + @Test + public void testMakeTopicData() { + // use a real provider so we can test the real makeTopicData() method + DmaapSimProvider prov2 = new DmaapSimProvider(params); + prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0); + } + + @Test + public void testGetInstance_testSetInstance() { + DmaapSimProvider.setInstance(prov); + assertSame(prov, DmaapSimProvider.getInstance()); + + DmaapSimProvider.setInstance(null); + assertNull(DmaapSimProvider.getInstance()); + } + + + public class MyProvider extends DmaapSimProvider { + + public MyProvider(DmaapSimParameterGroup params) { + super(params); + } + + @Override + protected ScheduledExecutorService makeTimerPool() { + return timer; + } + + @Override + protected TopicData makeTopicData(String topicName) { + switch (topicName) { + case TOPIC1: + return data1; + case TOPIC2: + return data2; + default: + throw new IllegalArgumentException("unknown topic name: " + topicName); + } + } + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java new file mode 100644 index 000000000..f7e1f5e6c --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java @@ -0,0 +1,213 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; + +public class TopicDataTest { + private static final String EXPECTED_EXCEPTION = "expected exception"; + private static final String GROUP1 = "group-A"; + private static final String GROUP2 = "group-B"; + private static final String GROUP3 = "group-C"; + + private TopicData data; + private ConsumerGroupData consgrp1; + private ConsumerGroupData consgrp2; + private ConsumerGroupData consgrp3; + private List<ConsumerGroupData> groups; + + /** + * Sets up mocks and the initial data object. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + consgrp1 = mock(ConsumerGroupData.class); + consgrp2 = mock(ConsumerGroupData.class); + consgrp3 = mock(ConsumerGroupData.class); + + when(consgrp1.read(anyInt(), anyLong())).thenReturn(Collections.emptyList()); + when(consgrp2.read(anyInt(), anyLong())).thenReturn(Collections.emptyList()); + when(consgrp3.read(anyInt(), anyLong())).thenReturn(Collections.emptyList()); + + groups = new LinkedList<>(Arrays.asList(consgrp1, consgrp2, consgrp3)); + + data = new TopicData("my-topic") { + @Override + protected ConsumerGroupData makeData(String consumerGroup) { + return groups.remove(0); + } + }; + } + + @Test + public void testRemoveIdleConsumers() throws Exception { + // force two consumers into the map + data.read(GROUP1, 0, 0); + data.read(GROUP2, 0, 0); + data.read(GROUP3, 0, 0); + + // indicate that one should be removed + when(consgrp1.shouldRemove()).thenReturn(true); + + // sweep + data.removeIdleConsumers(); + + assertEquals("[group-B, group-C]", new TreeSet<>(getGroups().keySet()).toString()); + + // indicate that the others should be removed + when(consgrp2.shouldRemove()).thenReturn(true); + when(consgrp3.shouldRemove()).thenReturn(true); + + // sweep + data.removeIdleConsumers(); + + assertTrue(getGroups().isEmpty()); + } + + @Test + public void testRead() throws Exception { + List<String> lst = Collections.emptyList(); + + when(consgrp1.read(anyInt(), anyLong())).thenReturn(ConsumerGroupData.UNREADABLE_LIST) + .thenReturn(ConsumerGroupData.UNREADABLE_LIST).thenReturn(lst); + + assertSame(lst, data.read(GROUP1, 10, 20)); + + // should have invoked three times + verify(consgrp1, times(3)).read(anyInt(), anyLong()); + + // should have used the given values + verify(consgrp1, times(3)).read(10, 20); + + // should not have allocated more than one group + assertEquals(2, groups.size()); + } + + @Test + public void testRead_MultipleGroups() throws Exception { + List<String> lst1 = Collections.emptyList(); + when(consgrp1.read(anyInt(), anyLong())).thenReturn(lst1); + + List<String> lst2 = Collections.emptyList(); + when(consgrp2.read(anyInt(), anyLong())).thenReturn(lst2); + + // one from each group + assertSame(lst1, data.read(GROUP1, 0, 0)); + assertSame(lst2, data.read(GROUP2, 0, 0)); + + // repeat + assertSame(lst1, data.read(GROUP1, 0, 0)); + assertSame(lst2, data.read(GROUP2, 0, 0)); + + // again + assertSame(lst1, data.read(GROUP1, 0, 0)); + assertSame(lst2, data.read(GROUP2, 0, 0)); + + // should still have group3 in the list + assertEquals(1, groups.size()); + } + + @Test + public void testWrite() throws Exception { + // no groups yet + List<Object> messages = Arrays.asList("hello", "world"); + data.write(messages); + + // add two groups + data.read(GROUP1, 0, 0); + data.read(GROUP2, 0, 0); + + data.write(messages); + + // should have been written to both groups + List<String> strings = messages.stream().map(Object::toString).collect(Collectors.toList()); + verify(consgrp1).write(strings); + verify(consgrp2).write(strings); + } + + @Test + public void testConvertMessagesToStrings() { + assertEquals("[abc, 200]", data.convertMessagesToStrings(Arrays.asList("abc", null, 200)).toString()); + } + + @Test + public void testConvertMessageToString() throws CoderException { + Coder coder = new StandardCoder(); + + assertNull(data.convertMessageToString(null, coder)); + assertEquals("text-msg", data.convertMessageToString("text-msg", coder)); + assertEquals("100", data.convertMessageToString(100, coder)); + + coder = mock(Coder.class); + when(coder.encode(any())).thenThrow(new CoderException(EXPECTED_EXCEPTION)); + assertNull(data.convertMessageToString(new TreeMap<String,Object>(), coder)); + } + + @Test + public void testMakeData() throws Exception { + // use real objects instead of mocks + TopicData data2 = new TopicData("real-data-topic"); + + // force a group into the topic + data2.read(GROUP1, 0, 0); + + data2.write(Arrays.asList("abc", "def", "ghi")); + + assertEquals("[abc, def]", data2.read(GROUP1, 2, 0).toString()); + } + + /** + * Gets the consumer group map from the topic data object. + * + * @return the topic's consumer group map + */ + @SuppressWarnings("unchecked") + private Map<String, ConsumerGroupData> getGroups() { + return (Map<String, ConsumerGroupData>) Whitebox.getInternalState(data, "group2data"); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java new file mode 100644 index 000000000..5cba78355 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.UUID; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import org.junit.Before; +import org.junit.Test; + +public class BaseRestControllerV1Test { + + private BaseRestControllerV1 ctlr; + private ResponseBuilder bldr; + + @Before + public void setUp() { + ctlr = new BaseRestControllerV1(); + bldr = Response.status(Response.Status.OK); + } + + @Test + public void testAddVersionControlHeaders() { + Response resp = ctlr.addVersionControlHeaders(bldr).build(); + assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_MINOR_NAME)); + assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_PATCH_NAME)); + assertEquals("1.0.0", resp.getHeaderString(BaseRestControllerV1.VERSION_LATEST_NAME)); + } + + @Test + public void testAddLoggingHeaders_Null() { + Response resp = ctlr.addLoggingHeaders(bldr, null).build(); + assertNotNull(resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME)); + } + + @Test + public void testAddLoggingHeaders_NonNull() { + UUID uuid = UUID.randomUUID(); + Response resp = ctlr.addLoggingHeaders(bldr, uuid).build(); + assertEquals(uuid.toString(), resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java new file mode 100644 index 000000000..5d9186c75 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java @@ -0,0 +1,145 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import javax.ws.rs.core.MediaType; +import org.junit.Before; +import org.junit.Test; + +public class CambriaMessageBodyHandlerTest { + private static final String STD_INPUT = "1.3.XAbc"; + private static final String EXPECTED_OUTPUT = "[Abc]"; + + private CambriaMessageBodyHandler hdlr; + + @Before + public void setUp() { + hdlr = new CambriaMessageBodyHandler(); + } + + @Test + public void testIsReadable() { + assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("application/cambria"))); + + assertFalse(hdlr.isReadable(null, null, null, null)); + assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("application/other"))); + assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/cambria"))); + } + + @Test + public void testReadFrom() throws IOException { + List<Object> lst = readStream("1.11.AMessageBody", "3.3.123Foo3.3.123Bar", "0.16.You can do that..8.Or that."); + assertEquals("[MessageBody, Foo, Bar, You can do that., Or that.]", lst.toString()); + + // empty stream + lst = readStream(); + assertEquals("[]", lst.toString()); + } + + @Test + public void testReadMessage_InvalidPartitionLength() { + assertThatThrownBy(() -> readStream("100000000.3.")).isInstanceOf(IOException.class) + .hasMessage("invalid partition length"); + } + + @Test + public void testReadMessage_InvalidMessageLength() { + assertThatThrownBy(() -> readStream("3.100000000.ABC")).isInstanceOf(IOException.class) + .hasMessage("invalid message length"); + } + + @Test + public void testSkipWhitespace() throws IOException { + // no white space + assertEquals(EXPECTED_OUTPUT, readStream(STD_INPUT).toString()); + + // single white space + assertEquals(EXPECTED_OUTPUT, readStream(" " + STD_INPUT).toString()); + + // multiple white spaces + assertEquals(EXPECTED_OUTPUT, readStream("\n\n\t" + STD_INPUT).toString()); + } + + @Test + public void testReadLength_NoDigits() throws IOException { + assertEquals("[]", readStream("..").toString()); + } + + @Test + public void testReadLength_NoDot() { + assertThatThrownBy(() -> readStream("3.2")).isInstanceOf(EOFException.class) + .hasMessage("missing '.' in 'length' field"); + } + + @Test + public void testReadLength_NonDigit() { + assertThatThrownBy(() -> readStream("3.2x.ABCde")).isInstanceOf(IOException.class) + .hasMessage("invalid character in 'length' field"); + } + + @Test + public void testReadLength_TooManyDigits() { + assertThatThrownBy(() -> readStream("3.12345678901234567890.ABCde")).isInstanceOf(IOException.class) + .hasMessage("too many digits in 'length' field"); + } + + @Test + public void testReadString_ZeroLength() throws IOException { + assertEquals("[]", readStream("1..X").toString()); + } + + @Test + public void testReadString_TooShort() { + assertThatThrownBy(() -> readStream(".5.me")).isInstanceOf(EOFException.class).hasMessageContaining("actual"); + } + + /** + * Reads a stream via the handler. + * + * @param text lines of text to be read + * @return the list of objects that were decoded from the stream + * @throws IOException if an error occurs + */ + private List<Object> readStream(String... text) throws IOException { + return hdlr.readFrom(null, null, null, null, null, makeStream(text)); + } + + /** + * Creates an input stream from lines of text. + * + * @param text lines of text + * @return an input stream + */ + private InputStream makeStream(String... text) { + return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java new file mode 100644 index 000000000..7e30c5a4c --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java @@ -0,0 +1,181 @@ +/* + * ============LICENSE_START======================================================= + * Modifications Copyright (C) 2019 AT&T Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import java.io.File; +import java.io.FileOutputStream; +import java.nio.charset.StandardCharsets; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import lombok.Getter; +import org.glassfish.jersey.client.ClientProperties; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; +import org.onap.policy.common.gson.GsonMessageBodyHandler; +import org.onap.policy.common.utils.network.NetworkUtil; +import org.onap.policy.common.utils.services.Registry; +import org.onap.policy.models.sim.dmaap.DmaapSimException; +import org.onap.policy.models.sim.dmaap.startstop.Main; +import org.onap.policy.sim.dmaap.parameters.CommonTestData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Common base class for rest server tests. + */ +public class CommonRestServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(CommonRestServer.class); + + public static final String NOT_ALIVE = "not alive"; + public static final String ALIVE = "alive"; + public static final String SELF = "self"; + public static final String NAME = "DMaaP Simulator"; + public static final String ENDPOINT_PREFIX = "events/"; + + @Getter + private static int port; + + protected static String httpPrefix; + + private static Main main; + + /** + * Allocates a port for the server, writes a config file, and then starts Main. + * + * @throws Exception if an error occurs + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + port = NetworkUtil.allocPort(); + + httpPrefix = "http://localhost:" + port + "/"; + + String json = new CommonTestData().getParameterGroupAsString(port); + makeConfigFile("src/test/resources/parameters/TestConfigParams.json", json); + + HttpServletServerFactoryInstance.getServerFactory().destroy(); + + startMain(); + } + + /** + * Stops Main. + */ + @AfterClass + public static void teardownAfterClass() { + try { + if (main != null) { + Main main2 = main; + main = null; + + main2.shutdown(); + } + + } catch (DmaapSimException exp) { + LOGGER.error("cannot stop main", exp); + } + } + + /** + * Set up. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + // restart, if not currently running + if (main == null) { + startMain(); + } + } + + /** + * Makes a parameter configuration file. + * @param fileName name of the config file to be created + * @param json json to be written to the file + * + * @throws Exception if an error occurs + */ + protected static void makeConfigFile(String fileName, String json) throws Exception { + File file = new File(fileName); + file.deleteOnExit(); + + try (FileOutputStream output = new FileOutputStream(file)) { + output.write(json.getBytes(StandardCharsets.UTF_8)); + } + } + + /** + * Starts the "Main". + * + * @throws Exception if an error occurs + */ + private static void startMain() throws Exception { + Registry.newRegistry(); + + // make sure port is available + if (NetworkUtil.isTcpPortOpen("localhost", port, 1, 1L)) { + throw new IllegalStateException("port " + port + " is still in use"); + } + + final String[] simConfigParameters = {"-c", "src/test/resources/parameters/TestConfigParams.json"}; + + main = new Main(simConfigParameters); + + if (!NetworkUtil.isTcpPortOpen("localhost", port, 60, 1000L)) { + throw new IllegalStateException("server is not listening on port " + port); + } + } + + /** + * Sends a request to an endpoint. + * + * @param endpoint the target endpoint + * @return a request builder + * @throws Exception if an error occurs + */ + protected Invocation.Builder sendRequest(final String endpoint) throws Exception { + return sendFqeRequest(httpPrefix + ENDPOINT_PREFIX + endpoint); + } + + /** + * Sends a request to a fully qualified endpoint. + * + * @param fullyQualifiedEndpoint the fully qualified target endpoint + * @return a request builder + */ + protected Invocation.Builder sendFqeRequest(final String fullyQualifiedEndpoint) { + final Client client = ClientBuilder.newBuilder().build(); + + client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true"); + client.register(GsonMessageBodyHandler.class); + + final WebTarget webTarget = client.target(fullyQualifiedEndpoint); + + return webTarget.request(MediaType.APPLICATION_JSON); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java new file mode 100644 index 000000000..7b84d543d --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.Arrays; +import java.util.Map; +import javax.ws.rs.core.Response; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; +import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider; + +public class DmaapSimRestControllerV1Test { + private static final int LIMIT = 5; + private static final String TOPIC = "my-topic"; + private static final String TOPIC2 = "my-topic-B"; + private static final String MESSAGE = "my-message"; + private static final String MESSAGE2 = "my-message-B"; + private static final String CONSUMER = "my-consumer"; + private static final String CONSUMER_ID = "my-id"; + + private static Coder coder = new StandardCoder(); + + private DmaapSimRestControllerV1 rest; + + /** + * Creates the controller. + * + * @throws CoderException if the parameters cannot be loaded + */ + @Before + public void setUp() throws CoderException { + DmaapSimParameterGroup params = coder.decode(new File("src/test/resources/parameters/NormalParameters.json"), + DmaapSimParameterGroup.class); + DmaapSimProvider.setInstance(new DmaapSimProvider(params)); + rest = new DmaapSimRestControllerV1(); + } + + @Test + public void test() { + Response resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals("[]", resp.getEntity().toString()); + + // add some messages + resp = rest.postDmaapMessage(TOPIC, Arrays.asList(MESSAGE, MESSAGE2)); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals(2, getCount(resp)); + + resp = rest.postDmaapMessage(TOPIC2, Arrays.asList(MESSAGE, MESSAGE2, MESSAGE)); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals(3, getCount(resp)); + + // hadn't registered with topic 2 so nothing expected from there + resp = rest.getDmaapMessage(TOPIC2, CONSUMER, CONSUMER_ID, LIMIT, 0); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals("[]", resp.getEntity().toString()); + + // now read from topic 1 + resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals("[my-message, my-message-B]", resp.getEntity().toString()); + } + + private int getCount(Response resp) { + @SuppressWarnings("unchecked") + Map<String, Object> map = (Map<String, Object>) resp.getEntity(); + + return (int) map.get("count"); + } + +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java new file mode 100644 index 000000000..2dfbae980 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import javax.ws.rs.core.MediaType; +import org.junit.Before; +import org.junit.Test; + +public class TextMessageBodyHandlerTest { + private TextMessageBodyHandler hdlr; + + @Before + public void setUp() { + hdlr = new TextMessageBodyHandler(); + } + + @Test + public void testIsReadable() { + assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("text/plain"))); + + assertFalse(hdlr.isReadable(null, null, null, null)); + assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("text/other"))); + assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/plain"))); + } + + @Test + public void testReadFrom() throws IOException { + List<Object> lst = readStream("hello", "world"); + assertEquals("[hello, world]", lst.toString()); + + // empty stream + lst = readStream(); + assertEquals("[]", lst.toString()); + } + + /** + * Reads a stream via the handler. + * + * @param text lines of text to be read + * @return the list of objects that were decoded from the stream + * @throws IOException if an error occurs + */ + private List<Object> readStream(String... text) throws IOException { + return hdlr.readFrom(null, null, null, null, null, makeStream(text)); + } + + /** + * Creates an input stream from lines of text. + * + * @param text lines of text + * @return an input stream + */ + private InputStream makeStream(String... text) { + return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java new file mode 100644 index 000000000..8c35de64e --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java @@ -0,0 +1,199 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.e2e; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.PrintWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.endpoints.parameters.TopicParameters; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.sim.dmaap.rest.CommonRestServer; + +/** + * This tests the simulator using dmaap endpoints to verify that it works from publisher + * to subscriber. + */ +public class EndToEndTest extends CommonRestServer { + private static final int MAX_WAIT_SEC = 5; + private static final String ORIG_TOPIC = "MY-TOPIC"; + private static final String ORIG_TOPIC2 = "MY-TOPIC-B"; + private static final int MAX_MSG = 200; + + private static int ntests = 0; + private static String topicJson; + + private TopicParameterGroup topicConfig; + + private String topic = "MY-TOPIC"; + private String topic2 = "MY-TOPIC-B"; + + /** + * Messages from the topic are placed here by the endpoint. + */ + private BlockingQueue<String> queue; + + /** + * Messages from topic 2 are placed here by the endpoint. + */ + private BlockingQueue<String> queue2; + + /** + * Starts the rest server. + * + * @throws Exception if an error occurs + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TopicEndpointManager.getManager().shutdown(); + + CommonRestServer.setUpBeforeClass(); + + topicJson = new String( + Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()), + StandardCharsets.UTF_8); + topicJson = topicJson.replace("${port}", String.valueOf(getPort())); + } + + /** + * Starts the topics. + * + * @throws CoderException if the parameters cannot be decoded + */ + @Before + @Override + public void setUp() throws CoderException { + queue = new LinkedBlockingQueue<>(); + queue2 = new LinkedBlockingQueue<>(); + + /* + * change topic names for each test so any listeners that may still exist will not + * grab new messages + */ + + ++ntests; + topic = "my-topic-" + ntests; + topic2 = "my-topic-b" + ntests; + + String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic); + + topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class); + + TopicEndpointManager.getManager().addTopics(topicConfig); + TopicEndpointManager.getManager().start(); + } + + @After + public void tearDown() { + TopicEndpointManager.getManager().shutdown(); + } + + @Test + public void testWithTopicEndpointAtEachEnd() throws InterruptedException { + // register listeners to add events to appropriate queue + TopicEndpointManager.getManager().getDmaapTopicSource(topic) + .register((infra, topic, event) -> queue.add(event)); + TopicEndpointManager.getManager().getDmaapTopicSource(topic2) + .register((infra, topic, event) -> queue2.add(event)); + + // publish events + TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic); + TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2); + for (int x = 0; x < MAX_MSG; ++x) { + sink.send("hello-" + x); + sink2.send("world-" + x); + } + + // verify events where received + for (int x = 0; x < MAX_MSG; ++x) { + assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + } + } + + @Test + public void testCambriaFormat() throws Exception { + test("testCambriaFormat", "application/cambria", + (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n"))); + } + + @Test + public void testJson() throws Exception { + test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]")); + } + + @Test + public void testText() throws Exception { + test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println)); + } + + /** + * Uses a raw URL connection to ensure the server can process messages of the given + * media type. + * + * @param testName name of the test + * @param mediaType media type + * @param writeMessages function that writes messages to a PrintWriter + * @throws Exception if an error occurs + */ + private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages) + throws Exception { + String msg1 = "{'abc':10.0}".replace('\'', '"'); + String msg2 = "{'def':20.0}".replace('\'', '"'); + + TopicEndpointManager.getManager().getDmaapTopicSource(topic) + .register((infra, topic, event) -> queue.add(event)); + + TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0); + URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic()); + + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-type", mediaType); + conn.setDoOutput(true); + conn.connect(); + + try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) { + writeMessages.accept(wtr, Arrays.asList(msg1, msg2)); + } + + assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode()); + + assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java new file mode 100644 index 000000000..21d9ed604 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * Modifications Copyright (C) 2019 AT&T Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.parameters; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; + +/** + * Class to hold/create all parameters for test cases. + */ +public class CommonTestData { + public static final String SIM_GROUP_NAME = "DMaapSim"; + + private static final Coder coder = new StandardCoder(); + + /** + * Gets the standard simulator parameters. + * + * @param port port to be inserted into the parameters + * @return the standard simulator parameters + */ + public DmaapSimParameterGroup getParameterGroup(int port) { + try { + return coder.decode(getParameterGroupAsString(port), DmaapSimParameterGroup.class); + + } catch (CoderException e) { + throw new DmaapSimRuntimeException("cannot read simulator parameters", e); + } + } + + /** + * Gets the standard simulator parameters, as a String. + * + * @param port port to be inserted into the parameters + * @return the standard simulator parameters + */ + public String getParameterGroupAsString(int port) { + + try { + File file = new File("src/test/resources/parameters/NormalParameters.json"); + String json = new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8); + + json = json.replace("6845", String.valueOf(port)); + + return json; + + } catch (IOException e) { + throw new DmaapSimRuntimeException("cannot read simulator parameters", e); + } + } + + /** + * Nulls out a field within a JSON string. It does it by adding a field with the same + * name, having a null value, and then prefixing the original field name with "Xxx", + * thus causing the original field and value to be ignored. + * + * @param json JSON string + * @param field field to be nulled out + * @return a new JSON string with the field nulled out + */ + public String nullifyField(String json, String field) { + return json.replace(field + "\"", field + "\":null, \"" + field + "Xxx\""); + } +} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java index 1682eb991..828cd89b0 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java @@ -1,6 +1,6 @@ -/* +/*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,17 +16,19 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.models.sim.dmaap; +package org.onap.policy.sim.dmaap.parameters; -/** - * Names of various items contained in the Registry. - */ -public class DmaapSimConstants { +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; - // Registry keys - public static final String REG_DMAAP_SIM_ACTIVATOR = "object:activator/dmaap-sim"; +public class DmaapSimParameterGroupTest { + private static final String MY_NAME = "my-name"; - private DmaapSimConstants() { - super(); + @Test + public void testDmaapSimParameterGroup() { + DmaapSimParameterGroup params = new DmaapSimParameterGroup(MY_NAME); + assertEquals(MY_NAME, params.getName()); } } diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java new file mode 100644 index 000000000..8f053d219 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java @@ -0,0 +1,70 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.parameters; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.models.sim.dmaap.DmaapSimException; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler; +import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments; + +public class DmaapSimParameterHandlerTest { + + private static final String RESOURCE_DIR = "src/test/resources/parameters/"; + + private DmaapSimParameterHandler handler; + + @Before + public void setUp() { + handler = new DmaapSimParameterHandler(); + } + + @Test + public void testGetParameters() throws DmaapSimException { + final DmaapSimCommandLineArguments args = new DmaapSimCommandLineArguments(); + + args.parse(new String[] {"-c", RESOURCE_DIR + "NormalParameters.json"}); + DmaapSimParameterGroup params = handler.getParameters(args); + assertNotNull(params); + assertEquals("DMaapSim", params.getName()); + assertEquals(300L, params.getTopicSweepSec()); + assertEquals(6845, params.getRestServerParameters().getPort()); + + + args.parse(new String[] {"-c", "FileNotFound.json"}); + assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class) + .hasMessageStartingWith("error reading parameters"); + + + args.parse(new String[] {"-c", RESOURCE_DIR + "EmptyParameterFile.json"}); + assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class) + .hasMessageStartingWith("no parameters found"); + + + args.parse(new String[] {"-c", RESOURCE_DIR + "Parameters_InvalidName.json"}); + assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class) + .hasMessageContaining("validation error"); + } + +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java new file mode 100644 index 000000000..380a72423 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.startstop; + +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; +import org.onap.policy.common.utils.services.Registry; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler; +import org.onap.policy.models.sim.dmaap.startstop.DmaapSimActivator; +import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments; + + +/** + * Class to perform unit test of {@link DmaapSimActivator}}. + */ +public class DmaapSimActivatorTest { + + private DmaapSimActivator activator; + + /** + * Initializes an activator. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + Registry.newRegistry(); + HttpServletServerFactoryInstance.getServerFactory().destroy(); + + final String[] papConfigParameters = {"-c", "parameters/NormalParameters.json"}; + final DmaapSimCommandLineArguments arguments = new DmaapSimCommandLineArguments(papConfigParameters); + final DmaapSimParameterGroup parGroup = new DmaapSimParameterHandler().getParameters(arguments); + + activator = new DmaapSimActivator(parGroup); + } + + /** + * Method for cleanup after each test. + * + * @throws Exception if an error occurs + */ + @After + public void teardown() throws Exception { + if (activator != null && activator.isAlive()) { + activator.stop(); + } + } + + @Test + public void testDmaapSimActivator() { + assertFalse(activator.isAlive()); + activator.start(); + assertTrue(activator.isAlive()); + + // repeat - should throw an exception + assertThatIllegalStateException().isThrownBy(() -> activator.start()); + assertTrue(activator.isAlive()); + } + + @Test + public void testTerminate() { + activator.start(); + activator.stop(); + assertFalse(activator.isAlive()); + + // repeat - should throw an exception + assertThatIllegalStateException().isThrownBy(() -> activator.stop()); + assertFalse(activator.isAlive()); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java new file mode 100644 index 000000000..b8e285a99 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java @@ -0,0 +1,100 @@ +/* + * ============LICENSE_START======================================================= + * Modifications Copyright (C) 2019 AT&T Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.startstop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; +import org.onap.policy.models.sim.dmaap.DmaapSimException; +import org.onap.policy.models.sim.dmaap.startstop.Main; +import org.onap.policy.sim.dmaap.parameters.CommonTestData; + +/** + * Class to perform unit test of {@link Main}}. + * + * @author Ram Krishna Verma (ram.krishna.verma@est.tech) + */ +public class MainTest { + private Main main; + + /** + * Set up. + */ + @Before + public void setUp() { + main = null; + HttpServletServerFactoryInstance.getServerFactory().destroy(); + } + + /** + * Shuts "main" down. + * + * @throws Exception if an error occurs + */ + @After + public void tearDown() throws Exception { + if (main != null) { + main.shutdown(); + } + } + + @Test + public void testMain() throws DmaapSimException { + final String[] NormalParameters = {"-c", "parameters/NormalParameters.json"}; + main = new Main(NormalParameters); + assertTrue(main.getParameters().isValid()); + assertEquals(CommonTestData.SIM_GROUP_NAME, main.getParameters().getName()); + + main.shutdown(); + } + + @Test + public void testMain_NoArguments() { + final String[] NormalParameters = {}; + main = new Main(NormalParameters); + assertTrue(main.getParameters() == null); + } + + @Test + public void testMain_InvalidArguments() { + // note: this is missing the "-c" argument, thus the ARGUMENTS are invalid + final String[] NormalParameters = {"parameters/NormalParameters.json"}; + main = new Main(NormalParameters); + assertTrue(main.getParameters() == null); + } + + @Test + public void testMain_Help() { + final String[] NormalParameters = {"-h"}; + Main.main(NormalParameters); + } + + @Test + public void testMain_InvalidParameters() { + final String[] NormalParameters = {"-c", "parameters/InvalidParameters.json"}; + main = new Main(NormalParameters); + assertTrue(main.getParameters() == null); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json index aeedf9d6e..a1c98a5b1 100644 --- a/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json @@ -1,5 +1,6 @@ -{ +{ "name":"DMaapSim", + "topicSweepSec": 1, "restServerParameters":{ "host":"0.0.0.0", "port":6845 diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json index a2a036645..deec966e8 100644 --- a/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json @@ -1,5 +1,6 @@ { "name": "DMaapSim", + "topicSweepSec": 300, "restServerParameters": { "host": "0.0.0.0", "port": 6845 diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json index fba033e52..51e9458b0 100644 --- a/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json @@ -1,5 +1,6 @@ { "name":" ", + "topicSweepSec": 1, "restServerParameters":{ "host":"0.0.0.0", "port":6969, diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json new file mode 100644 index 000000000..77a320f6d --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json @@ -0,0 +1,36 @@ +{ + "topicSources": [ + { + "topic": "MY-TOPIC", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap", + "fetchTimeout": 100 + }, + { + "topic": "MY-TOPIC-B", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap", + "fetchTimeout": 100 + } + ], + "topicSinks": [ + { + "topic": "MY-TOPIC", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap" + }, + { + "topic": "MY-TOPIC-B", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap" + } + ] +}
\ No newline at end of file |