summaryrefslogtreecommitdiffstats
path: root/models-sim/models-sim-dmaap/src
diff options
context:
space:
mode:
Diffstat (limited to 'models-sim/models-sim-dmaap/src')
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java11
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java5
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java6
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java190
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java215
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java201
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java170
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java76
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java100
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java63
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java66
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java25
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java40
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java13
-rw-r--r--models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json1
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java39
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java305
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java287
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java213
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java63
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java145
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java181
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java94
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java81
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java199
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java89
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java (renamed from models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java)24
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java70
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java95
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java100
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json0
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json3
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json1
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json1
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json36
35 files changed, 2793 insertions, 415 deletions
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java
index caae287b8..11da57582 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +22,8 @@
package org.onap.policy.models.sim.dmaap.parameters;
import lombok.Getter;
-
import org.onap.policy.common.parameters.ParameterGroupImpl;
+import org.onap.policy.common.parameters.annotations.Min;
import org.onap.policy.common.parameters.annotations.NotBlank;
import org.onap.policy.common.parameters.annotations.NotNull;
@@ -36,6 +37,14 @@ public class DmaapSimParameterGroup extends ParameterGroupImpl {
private RestServerParameters restServerParameters;
/**
+ * Frequency, in seconds, with which to sweep the topics of idle consumers. On each
+ * sweep cycle, if a consumer group has had no new poll requests since the last sweep
+ * cycle, it is removed.
+ */
+ @Min(1)
+ private long topicSweepSec;
+
+ /**
* Create the DMaaP simulator parameter group.
*
* @param name the parameter group name
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java
index 8eb76ec00..252054504 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,7 +39,7 @@ public class DmaapSimParameterHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimParameterHandler.class);
- private static final Coder CODER = new StandardCoder();
+ private final Coder coder = new StandardCoder();
/**
* Read the parameters from the parameter file.
@@ -54,7 +55,7 @@ public class DmaapSimParameterHandler {
try {
// Read the parameters from JSON
File file = new File(arguments.getFullConfigurationFilePath());
- dmaapSimParameterGroup = CODER.decode(file, DmaapSimParameterGroup.class);
+ dmaapSimParameterGroup = coder.decode(file, DmaapSimParameterGroup.class);
} catch (final CoderException e) {
final String errorMessage = "error reading parameters from \"" + arguments.getConfigurationFilePath()
+ "\"\n" + "(" + e.getClass().getSimpleName() + "):" + e.getMessage();
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java
index c7269f66d..8fb2cffa3 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,13 +22,14 @@
package org.onap.policy.models.sim.dmaap.parameters;
import lombok.Getter;
+
import org.onap.policy.common.parameters.ParameterGroupImpl;
import org.onap.policy.common.parameters.annotations.Min;
import org.onap.policy.common.parameters.annotations.NotBlank;
import org.onap.policy.common.parameters.annotations.NotNull;
/**
- * Class to hold all parameters needed for DMaaP simulator rest server.
+ * Class to hold all parameters needed for rest server.
*/
@NotNull
@NotBlank
@@ -39,6 +41,6 @@ public class RestServerParameters extends ParameterGroupImpl {
private int port;
public RestServerParameters() {
- super("RestServerParameters");
+ super(RestServerParameters.class.getSimpleName());
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java
new file mode 100644
index 000000000..737151339
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java
@@ -0,0 +1,190 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data associated with a consumer group. All consumer instances within a group share the
+ * same data object.
+ */
+public class ConsumerGroupData {
+ private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupData.class);
+
+ /**
+ * Returned when messages can no longer be read from this consumer group object,
+ * because it is being removed from the topic. {@link #UNREADABLE_LIST} must not be
+ * the same list as Collections.emptyList(), thus we wrap it.
+ */
+ public static final List<String> UNREADABLE_LIST = Collections.unmodifiableList(Collections.emptyList());
+
+ /**
+ * Returned when there are no messages read. Collections.emptyList() is already
+ * unmodifiable, thus no need to wrap it.
+ */
+ private static final List<String> EMPTY_LIST = Collections.emptyList();
+
+ /**
+ * This is locked while fields other than {@link #messageQueue} are updated.
+ */
+ private final Object lockit = new Object();
+
+ /**
+ * Number of sweep cycles that have occurred since a consumer has attempted to read
+ * from the queue. This consumer group should be removed once this count exceeds
+ * {@code 1}, provided {@link #nreaders} is zero.
+ */
+ private int nsweeps = 0;
+
+ /**
+ * Number of consumers that are currently attempting to read from the queue. This
+ * consumer group should not be removed as long as this is non-zero.
+ */
+ private int nreaders = 0;
+
+ /**
+ * Message queue.
+ */
+ private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param topicName name of the topic with which this object is associated
+ * @param groupName name of the consumer group with which this object is associated
+ */
+ public ConsumerGroupData(String topicName, String groupName) {
+ logger.info("Topic {}: add consumer group: {}", topicName, groupName);
+ }
+
+ /**
+ * Determines if this consumer group should be removed. This should be invoked once
+ * during each sweep cycle. When this returns {@code true}, this consumer group should
+ * be immediately discarded, as any readers will sit in a spin loop waiting for it to
+ * be discarded.
+ *
+ * @return {@code true} if this consumer group should be removed, {@code false}
+ * otherwise
+ */
+ public boolean shouldRemove() {
+ synchronized (lockit) {
+ return (nreaders == 0 && ++nsweeps > 1);
+ }
+ }
+
+ /**
+ * Reads messages from the queue, blocking if necessary.
+ *
+ * @param maxRead maximum number of messages to read
+ * @param waitMs time to wait, in milliseconds, if the queue is currently empty
+ * @return a list of messages read from the queue, empty if no messages became
+ * available before the wait time elapsed, or {@link #UNREADABLE_LIST} if this
+ * consumer group object is no longer active
+ * @throws InterruptedException if this thread was interrupted while waiting for the
+ * first message
+ */
+ public List<String> read(int maxRead, long waitMs) throws InterruptedException {
+
+ synchronized (lockit) {
+ if (nsweeps > 1 && nreaders == 0) {
+ // cannot use this consumer group object anymore
+ return UNREADABLE_LIST;
+ }
+
+ ++nreaders;
+ }
+
+ /*
+ * Note: do EVERYTHING inside of the "try" block, so that the "finally" block can
+ * update the reader count.
+ *
+ * Do NOT hold the lockit while we're polling, as poll() may block for a while.
+ */
+ try {
+ // always read at least one message
+ int maxRead2 = Math.max(1, maxRead);
+ long waitMs2 = Math.max(0, waitMs);
+
+ // perform a blocking read of the queue
+ String obj = getNextMessage(waitMs2);
+ if (obj == null) {
+ return EMPTY_LIST;
+ }
+
+ /*
+ * List should hold all messages from the queue PLUS the one we already have.
+ * Note: it's possible for additional messages to be added to the queue while
+ * we're reading from it. In that case, the list will grow as needed.
+ */
+ List<String> lst = new ArrayList<>(Math.min(maxRead2, messageQueue.size() + 1));
+ lst.add(obj);
+
+ // perform NON-blocking read of subsequent messages
+ for (int x = 1; x < maxRead2; ++x) {
+ if ((obj = messageQueue.poll()) == null) {
+ break;
+ }
+
+ lst.add(obj);
+ }
+
+ return lst;
+
+ } finally {
+ synchronized (lockit) {
+ --nreaders;
+ nsweeps = 0;
+ }
+ }
+ }
+
+ /**
+ * Writes messages to the queue.
+ *
+ * @param messages messages to be written to the queue
+ */
+ public void write(List<String> messages) {
+ messageQueue.addAll(messages);
+ }
+
+ // the following methods may be overridden by junit tests
+
+ /**
+ * Gets the next message from the queue.
+ *
+ * @param waitMs time to wait, in milliseconds, if the queue is currently empty
+ * @return the next message, or {@code null} if no messages became available before
+ * the wait time elapsed
+ * @throws InterruptedException if this thread was interrupted while waiting for the
+ * message
+ */
+ protected String getNextMessage(long waitMs) throws InterruptedException {
+ return messageQueue.poll(waitMs, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
index 9de29cdac..d11d1b397 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,18 +21,20 @@
package org.onap.policy.models.sim.dmaap.provider;
+import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+import javax.ws.rs.core.Response.Status;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,50 +43,70 @@ import org.slf4j.LoggerFactory;
*
* @author Liam Fallon (liam.fallon@est.tech)
*/
-public class DmaapSimProvider {
+public class DmaapSimProvider extends ServiceManagerContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
- // Recurring string constants
- private static final String TOPIC_TAG = "Topic:";
+ @Getter
+ @Setter
+ private static DmaapSimProvider instance;
- // Time for a get to wait before checking of a message has come
- private static final long DMAAP_SIM_WAIT_TIME = 50;
+ /**
+ * Maps a topic name to its data.
+ */
+ private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>();
- // recurring constants
- private static final String WITH_TIMEOUT = " with timeout ";
+ /**
+ * Thread used to remove idle consumers from the topics.
+ */
+ private ScheduledExecutorService timerPool;
- // The map of topic messages
- private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>();
- // The map of topic messages
- private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap =
- new LinkedHashMap<>();
+ /**
+ * Constructs the object.
+ *
+ * @param params parameters
+ */
+ public DmaapSimProvider(DmaapSimParameterGroup params) {
+ addAction("Topic Sweeper", () -> {
+ timerPool = makeTimerPool();
+ timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(),
+ TimeUnit.SECONDS);
+ }, () -> timerPool.shutdown());
+ }
/**
* Process a DMaaP message.
*
- * @param topicName The topic name
+ * @param topicName the topic name
* @param dmaapMessage the message to process
* @return a response to the message
*/
+ @SuppressWarnings("unchecked")
public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
- LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage);
-
- synchronized (topicMessageMap) {
- SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
- if (messageMap == null) {
- messageMap = new TreeMap<>();
- topicMessageMap.put(topicName, messageMap);
- LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map");
- }
+ LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage);
- int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
+ List<Object> lst;
- messageMap.put(nextKey, dmaapMessage);
- LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
+ if (dmaapMessage instanceof List) {
+ lst = (List<Object>) dmaapMessage;
+ } else {
+ lst = Collections.singletonList(dmaapMessage);
}
- return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build();
+ TopicData topic = topic2data.get(topicName);
+
+ /*
+ * Write all messages and return the count. If the topic doesn't exist yet, then
+ * there are no subscribers to receive the messages, thus treat it as if all
+ * messages were published.
+ */
+ int nmessages = (topic != null ? topic.write(lst) : lst.size());
+
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("serverTimeMs", 0);
+ map.put("count", nmessages);
+
+ return Response.status(Response.Status.OK).entity(map).build();
}
/**
@@ -92,102 +115,66 @@ public class DmaapSimProvider {
* @param topicName The topic to wait on
* @param consumerGroup the consumer group that is waiting
* @param consumerId the consumer ID that is waiting
- * @param timeout the length of time to wait for
+ * @param limit the maximum number of messages to get
+ * @param timeoutMs the length of time to wait for
* @return the DMaaP message or
*/
public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
- final int timeout) {
+ final int limit, final long timeoutMs) {
- LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
- + WITH_TIMEOUT + timeout);
+ LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup,
+ consumerId, limit, timeoutMs);
- MutablePair<Integer, String> consumerGroupPair = null;
-
- synchronized (consumerGroupsMap) {
- Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
- if (consumerGroupMap == null) {
- consumerGroupMap = new LinkedHashMap<>();
- consumerGroupsMap.put(topicName, consumerGroupMap);
- LOGGER.trace(
- TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
- }
-
- consumerGroupPair = consumerGroupMap.get(consumerGroup);
- if (consumerGroupPair == null) {
- consumerGroupPair = new MutablePair<>(-1, consumerId);
- consumerGroupMap.put(consumerGroup, consumerGroupPair);
- LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group "
- + consumerGroup + ":" + consumerId);
- }
- }
+ try {
+ List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit,
+ timeoutMs);
- long timeOfTimeout = System.currentTimeMillis() + timeout;
+ if (lst.isEmpty() && timeoutMs > 0) {
+ LOGGER.debug("Topic: {}, Timed out waiting for messages: {}: {}", topicName, consumerGroup, consumerId);
+ return Response.status(Status.REQUEST_TIMEOUT).entity(lst).build();
- do {
- Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
- if (waitingMessages != null) {
- LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
- + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
- return Response.status(Response.Status.OK).entity(waitingMessages).build();
+ } else {
+ LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, consumerGroup, lst.size(),
+ consumerId);
+ return Response.status(Status.OK).entity(lst).build();
}
- try {
- TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME);
- } catch (InterruptedException ie) {
- String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
- + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
- LOGGER.warn(errorMessage, ie);
- Thread.currentThread().interrupt();
- throw new DmaapSimRuntimeException(errorMessage, ie);
- }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup,
+ consumerId, e);
+ Thread.currentThread().interrupt();
+ return Response.status(Status.GONE).entity(Collections.emptyList()).build();
}
- while (timeOfTimeout > System.currentTimeMillis());
-
- LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
- + WITH_TIMEOUT + timeout);
- return Response.status(Response.Status.REQUEST_TIMEOUT).build();
}
/**
- * Return any messages on this topic with a message number greater than the supplied message number.
- *
- * @param topicName the topic name to check
- * @param consumerGroupPair the pair with the information on the last message retrieved
- * @return the messages or null if there are none
+ * Task to remove idle consumers from each topic.
*/
- private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) {
- String foundMessageList = "[";
-
- synchronized (topicMessageMap) {
- SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
- if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) {
- return null;
- }
+ private class SweeperTask implements Runnable {
+ @Override
+ public void run() {
+ topic2data.values().forEach(TopicData::removeIdleConsumers);
+ }
+ }
- boolean first = true;
- for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) {
- if (first) {
- first = false;
- } else {
- foundMessageList += ",";
- }
- try {
- foundMessageList += new StandardCoder().encode(dmaapMessage);
- } catch (CoderException ce) {
- String errorMessage = "Encoding error on message on DMaaP topic " + topicName;
- LOGGER.warn(errorMessage, ce);
- return null;
- }
- }
- foundMessageList += ']';
+ // the following methods may be overridden by junit tests
- LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft()
- + " to " + messageMap.lastKey());
- synchronized (consumerGroupsMap) {
- consumerGroupPair.setLeft(messageMap.lastKey());
- }
- }
+ /**
+ * Makes a new timer pool.
+ *
+ * @return a new timer pool
+ */
+ protected ScheduledExecutorService makeTimerPool() {
+ return Executors.newScheduledThreadPool(1);
+ }
- return (foundMessageList.length() < 3 ? null : foundMessageList);
+ /**
+ * Makes a new topic.
+ *
+ * @param topicName topic name
+ * @return a new topic
+ */
+ protected TopicData makeTopicData(String topicName) {
+ return new TopicData(topicName);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java
new file mode 100644
index 000000000..2737f455b
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java
@@ -0,0 +1,201 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data associated with a topic.
+ *
+ * <p/>
+ * Note: for ease of implementation, this adds a topic when a consumer polls it rather
+ * than when a publisher writes to it. This is the opposite of how the real DMaaP works.
+ * As a result, this will never return a topic-not-found message to the consumer.
+ */
+public class TopicData {
+ private static final Logger logger = LoggerFactory.getLogger(TopicData.class);
+
+ /**
+ * Name of the topic with which this data is associated.
+ */
+ private final String topicName;
+
+ /**
+ * Maps a consumer group name to its associated data.
+ */
+ private final Map<String, ConsumerGroupData> group2data = new ConcurrentHashMap<>();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param topicName name of the topic with which this object is associated
+ */
+ public TopicData(String topicName) {
+ logger.info("Topic {}: added", topicName);
+ this.topicName = topicName;
+ }
+
+ /**
+ * Removes idle consumers from the topic. This is typically called once during each
+ * sweep cycle.
+ */
+ public void removeIdleConsumers() {
+ Iterator<Entry<String, ConsumerGroupData>> iter = group2data.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<String, ConsumerGroupData> ent = iter.next();
+ if (ent.getValue().shouldRemove()) {
+ /*
+ * We want the minimum amount of time to elapse between invoking
+ * shouldRemove() and iter.remove(), thus all other statements (e.g.,
+ * logging) should be done AFTER iter.remove().
+ */
+ iter.remove();
+
+ logger.info("Topic {}: removed consumer group: {}", topicName, ent.getKey());
+ }
+ }
+ }
+
+ /**
+ * Reads from a particular consumer group's queue.
+ *
+ * @param consumerGroup name of the consumer group from which to read
+ * @param maxRead maximum number of messages to read
+ * @param waitMs time to wait, in milliseconds, if the queue is currently empty
+ * @return a list of messages read from the queue, empty if no messages became
+ * available before the wait time elapsed
+ * @throws InterruptedException if this thread was interrupted while waiting for the
+ * first message
+ */
+ public List<String> read(String consumerGroup, int maxRead, long waitMs) throws InterruptedException {
+ /*
+ * It's possible that this thread may spin several times while waiting for
+ * removeIdleConsumers() to complete its call to iter.remove(), thus we create
+ * this closure once, rather than each time through the loop.
+ */
+ Function<String, ConsumerGroupData> maker = this::makeData;
+
+ // loop until we get a readable list
+ List<String> result;
+
+ // @formatter:off
+
+ do {
+ result = group2data.computeIfAbsent(consumerGroup, maker).read(maxRead, waitMs);
+ }
+ while (result == ConsumerGroupData.UNREADABLE_LIST);
+
+ // @formatter:on
+
+ return result;
+ }
+
+ /**
+ * Writes messages to the queues of every consumer group.
+ *
+ * @param messages messages to be written to the queues
+ * @return the number of messages enqueued
+ */
+ public int write(List<Object> messages) {
+ List<String> list = convertMessagesToStrings(messages);
+
+ /*
+ * We don't care if a consumer group is deleted from the map while we're adding
+ * messages to it, as those messages will simply be ignored (and discarded by the
+ * garbage collector).
+ */
+ for (ConsumerGroupData data : group2data.values()) {
+ data.write(list);
+ }
+
+ return list.size();
+ }
+
+ /**
+ * Converts a list of message objects to a list of message strings. If a message
+ * cannot be converted for some reason, then it is not added to the result list, thus
+ * the result list may be shorted than the original input list.
+ *
+ * @param messages objects to be converted
+ * @return a list of message strings
+ */
+ protected List<String> convertMessagesToStrings(List<Object> messages) {
+ Coder coder = new StandardCoder();
+ List<String> list = new ArrayList<>(messages.size());
+
+ for (Object msg : messages) {
+ String str = convertMessageToString(msg, coder);
+ if (str != null) {
+ list.add(str);
+ }
+ }
+
+ return list;
+ }
+
+ /**
+ * Converts a message object to a message string.
+ *
+ * @param message message to be converted
+ * @param coder used to encode the message as a string
+ * @return the message string, or {@code null} if it cannot be converted
+ */
+ protected String convertMessageToString(Object message, Coder coder) {
+ if (message == null) {
+ return null;
+ }
+
+ if (message instanceof String) {
+ return message.toString();
+ }
+
+ try {
+ return coder.encode(message);
+ } catch (CoderException e) {
+ logger.warn("cannot encode {}", message, e);
+ return null;
+ }
+ }
+
+ // this may be overridden by junit tests
+
+ /**
+ * Makes data for a consumer group.
+ *
+ * @param consumerGroup name of the consumer group to make
+ * @return new consumer group data
+ */
+ protected ConsumerGroupData makeData(String consumerGroup) {
+ return new ConsumerGroupData(topicName, consumerGroup);
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java
index e269ac00b..dbd9422ef 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java
@@ -1,66 +1,178 @@
-/*
- * ============LICENSE_START======================================================= ONAP
- * ================================================================================ Copyright (C) 2019 AT&T Intellectual
- * Property. All rights reserved. ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.policy.models.sim.dmaap.rest;
import java.io.BufferedReader;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.Reader;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
import javax.ws.rs.Consumes;
-import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.Provider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.io.IOUtils;
/**
- * Provider that serializes and de-serializes JSON via gson.
+ * Provider that decodes "application/cambria" messages.
*/
@Provider
@Consumes(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA)
-@Produces(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA)
public class CambriaMessageBodyHandler implements MessageBodyReader<Object> {
- // Media type for Cambria
public static final String MEDIA_TYPE_APPLICATION_CAMBRIA = "application/cambria";
- public static final Logger logger = LoggerFactory.getLogger(CambriaMessageBodyHandler.class);
+ /**
+ * Maximum length of a message or partition.
+ */
+ private static final int MAX_LEN = 10000000;
+
+ /**
+ * Maximum digits in a length field.
+ */
+ private static final int MAX_DIGITS = 10;
@Override
public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
- return MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString());
+ return (mediaType != null && MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString()));
}
@Override
- public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
- MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
- throws IOException {
-
- String cambriaString = "";
- try (BufferedReader bufferedReader = new BufferedReader(
- new InputStreamReader(entityStream))) {
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- cambriaString += line;
+ public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException {
+
+ try (BufferedReader bufferedReader =
+ new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) {
+ List<Object> messages = new LinkedList<>();
+ String msg;
+ while ((msg = readMessage(bufferedReader)) != null) {
+ messages.add(msg);
+ }
+
+ return messages;
+ }
+ }
+
+ /**
+ * Reads a message.
+ *
+ * @param reader source from which to read
+ * @return the message that was read, or {@code null} if there are no more messages
+ * @throws IOException if an error occurs
+ */
+ private String readMessage(Reader reader) throws IOException {
+ if (!skipWhitespace(reader)) {
+ return null;
+ }
+
+ int partlen = readLength(reader);
+ if (partlen > MAX_LEN) {
+ throw new IOException("invalid partition length");
+ }
+
+ int msglen = readLength(reader);
+ if (msglen > MAX_LEN) {
+ throw new IOException("invalid message length");
+ }
+
+ // skip over the partition
+ reader.skip(partlen);
+
+ return readString(reader, msglen);
+ }
+
+ /**
+ * Skips whitespace.
+ *
+ * @param reader source from which to read
+ * @return {@code true} if there is another character after the whitespace,
+ * {@code false} if the end of the stream has been reached
+ * @throws IOException if an error occurs
+ */
+ private boolean skipWhitespace(Reader reader) throws IOException {
+ int chr;
+
+ do {
+ reader.mark(1);
+ if ((chr = reader.read()) < 0) {
+ return false;
+ }
+ }
+ while (Character.isWhitespace(chr));
+
+ // push the last character back onto the reader
+ reader.reset();
+
+ return true;
+ }
+
+ /**
+ * Reads a length field, which is a number followed by ".".
+ *
+ * @param reader source from which to read
+ * @return the length, or -1 if EOF has been reached
+ * @throws IOException if an error occurs
+ */
+ private int readLength(Reader reader) throws IOException {
+ StringBuilder bldr = new StringBuilder(MAX_DIGITS);
+
+ int chr;
+ for (int x = 0; x < MAX_DIGITS; ++x) {
+ if ((chr = reader.read()) < 0) {
+ throw new EOFException("missing '.' in 'length' field");
}
- return cambriaString.substring(cambriaString.indexOf('{'), cambriaString.length());
+ if (chr == '.') {
+ String text = bldr.toString().trim();
+ return (text.isEmpty() ? 0 : Integer.valueOf(text));
+ }
+
+ if (!Character.isDigit(chr)) {
+ throw new IOException("invalid character in 'length' field");
+ }
+
+ bldr.append((char) chr);
}
+
+ throw new IOException("too many digits in 'length' field");
+ }
+
+ /**
+ * Reads a string.
+ *
+ * @param reader source from which to read
+ * @param len length of the string (i.e., number of characters to read)
+ * @return the string that was read
+ * @throws IOException if an error occurs
+ */
+ private String readString(Reader reader, int len) throws IOException {
+ char[] buf = new char[len];
+ IOUtils.readFully(reader, buf);
+
+ return new String(buf);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java
index e3fdd4884..8339d0e64 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,24 +21,24 @@
package org.onap.policy.models.sim.dmaap.rest;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
-
import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
/**
* Class to provide REST endpoints for DMaaP simulator component statistics.
*/
@Path("/events")
+@Produces(DmaapSimRestControllerV1.MEDIA_TYPE_APPLICATION_JSON)
public class DmaapSimRestControllerV1 extends BaseRestControllerV1 {
+ public static final String MEDIA_TYPE_APPLICATION_JSON = "application/json";
/**
* Get a DMaaP message.
@@ -45,72 +46,33 @@ public class DmaapSimRestControllerV1 extends BaseRestControllerV1 {
* @param topicName topic to get message from
* @param consumerGroup consumer group that is getting the message
* @param consumerId consumer ID that is getting the message
- * @param timeout timeout for the message
+ * @param timeoutMs timeout for the message
* @return the message
*/
@GET
@Path("{topicName}/{consumerGroup}/{consumerId}")
- // @formatter:off
- @ApiOperation(
- value = "Get a DMaaP event on a topic",
- notes = "Returns an event on a DMaaP topic",
- response = Object.class,
- authorizations =
- @Authorization(value = AUTHORIZATION_TYPE)
- )
- @ApiResponses(
- value = {
- @ApiResponse(
- code = AUTHENTICATION_ERROR_CODE,
- message = AUTHENTICATION_ERROR_MESSAGE),
- @ApiResponse(
- code = AUTHORIZATION_ERROR_CODE,
- message = AUTHORIZATION_ERROR_MESSAGE),
- @ApiResponse(
- code = SERVER_ERROR_CODE,
- message = SERVER_ERROR_MESSAGE)
- }
- )
- // @formatter:on
- public Response getDmaaapMessage(@PathParam("topicName") final String topicName,
- @PathParam("consumerGroup") final String consumerGroup, @PathParam("consumerId") final String consumerId,
- @QueryParam("timeout") final int timeout) {
+ public Response getDmaapMessage(@PathParam("topicName") final String topicName,
+ @PathParam("consumerGroup") final String consumerGroup,
+ @PathParam("consumerId") final String consumerId,
+ @QueryParam("limit") @DefaultValue("1") final int limit,
+ @QueryParam("timeout") @DefaultValue("15000") final long timeoutMs) {
- return new DmaapSimProvider().processDmaapMessageGet(topicName, consumerGroup, consumerId, timeout);
+ return DmaapSimProvider.getInstance().processDmaapMessageGet(topicName, consumerGroup, consumerId, limit,
+ timeoutMs);
}
/**
* Post a DMaaP message.
*
- * @param topicName topic to get message from415
+ * @param topicName topic to get message from
* @return the response to the post
*/
@POST
@Path("{topicName}")
- // @formatter:off
- @ApiOperation(
- value = "Post a DMaaP event on a topic",
- notes = "Returns an event on a DMaaP topic",
- response = Response.class,
- authorizations =
- @Authorization(value = AUTHORIZATION_TYPE)
- )
- @ApiResponses(
- value = {
- @ApiResponse(
- code = AUTHENTICATION_ERROR_CODE,
- message = AUTHENTICATION_ERROR_MESSAGE),
- @ApiResponse(
- code = AUTHORIZATION_ERROR_CODE,
- message = AUTHORIZATION_ERROR_MESSAGE),
- @ApiResponse(
- code = SERVER_ERROR_CODE,
- message = SERVER_ERROR_MESSAGE)
- }
- )
- // @formatter:on
- public Response postDmaaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) {
+ @Consumes(value = {CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA,
+ TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN, MEDIA_TYPE_APPLICATION_JSON})
+ public Response postDmaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) {
- return new DmaapSimProvider().processDmaapMessagePut(topicName, dmaapMessage);
+ return DmaapSimProvider.getInstance().processDmaapMessagePut(topicName, dmaapMessage);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java
index 28de42c21..b05a0fe1a 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java
@@ -21,28 +21,21 @@
package org.onap.policy.models.sim.dmaap.rest;
-import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-
-import org.onap.policy.common.capabilities.Startable;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.gson.GsonMessageBodyHandler;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
import org.onap.policy.models.sim.dmaap.parameters.RestServerParameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Class to manage life cycle of DMaaP Simulator rest server.
*/
-public class DmaapSimRestServer implements Startable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimRestServer.class);
+public class DmaapSimRestServer extends ServiceManagerContainer {
- private List<HttpServletServer> servers = new ArrayList<>();
-
- private RestServerParameters restServerParameters;
+ private final List<HttpServletServer> servers;
/**
* Constructor for instantiating DmaapSimRestServer.
@@ -50,91 +43,40 @@ public class DmaapSimRestServer implements Startable {
* @param restServerParameters the rest server parameters
*/
public DmaapSimRestServer(final RestServerParameters restServerParameters) {
- this.restServerParameters = restServerParameters;
- }
+ this.servers = HttpServletServerFactoryInstance.getServerFactory()
+ .build(getServerProperties(restServerParameters));
- /**
- * {@inheritDoc}.
- */
- @Override
- public boolean start() {
- try {
- servers = HttpServletServerFactoryInstance.getServerFactory().build(getServerProperties());
- for (final HttpServletServer server : servers) {
- server.start();
- }
- } catch (final Exception exp) {
- LOGGER.error("Failed to start DMaaP simulator http server", exp);
- return false;
+ for (HttpServletServer server : this.servers) {
+ addAction("REST " + server.getName(), server::start, server::stop);
}
- return true;
}
/**
- * Creates the server properties object using restServerParameters.
+ * Creates a set of properties, suitable for building a REST server, from the
+ * parameters.
*
- * @return the properties object
+ * @param restServerParameters parameters from which to build the properties
+ * @return a set of properties representing the given parameters
*/
- private Properties getServerProperties() {
+ public static Properties getServerProperties(RestServerParameters restServerParameters) {
final Properties props = new Properties();
props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, restServerParameters.getName());
final String svcpfx =
- PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName();
+ PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName();
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, restServerParameters.getHost());
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
- Integer.toString(restServerParameters.getPort()));
+ Integer.toString(restServerParameters.getPort()));
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
- String.join(",", DmaapSimRestControllerV1.class.getName()));
+ DmaapSimRestControllerV1.class.getName());
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
- props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "true");
+ props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "false");
+
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER,
- CambriaMessageBodyHandler.class.getName() + "," + JsonMessageBodyHandler.class.getName());
+ String.join(",", CambriaMessageBodyHandler.class.getName(),
+ GsonMessageBodyHandler.class.getName(),
+ TextMessageBodyHandler.class.getName()));
return props;
}
-
- /**
- * {@inheritDoc}.
- */
- @Override
- public boolean stop() {
- for (final HttpServletServer server : servers) {
- try {
- server.stop();
- } catch (final Exception exp) {
- LOGGER.error("Failed to stop DMaaP simulator http server", exp);
- }
- }
- return true;
- }
-
- /**
- * {@inheritDoc}.
- */
- @Override
- public void shutdown() {
- stop();
- }
-
- /**
- * {@inheritDoc}.
- */
- @Override
- public boolean isAlive() {
- return !servers.isEmpty();
- }
-
- /**
- * {@inheritDoc}.
- */
- @Override
- public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("DmaapSimRestServer [servers=");
- builder.append(servers);
- builder.append("]");
- return builder.toString();
- }
-
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java
deleted file mode 100644
index a3eebda00..000000000
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * ============LICENSE_START======================================================= ONAP
- * ================================================================================ Copyright (C) 2019 AT&T Intellectual
- * Property. All rights reserved. ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.models.sim.dmaap.rest;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Provider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provider that serializes and de-serializes JSON via gson.
- */
-@Provider
-@Consumes(MediaType.APPLICATION_JSON)
-@Produces(MediaType.APPLICATION_JSON)
-public class JsonMessageBodyHandler implements MessageBodyReader<Object> {
- public static final Logger logger = LoggerFactory.getLogger(JsonMessageBodyHandler.class);
-
- @Override
- public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
- return MediaType.APPLICATION_JSON.equals(mediaType.toString());
- }
-
- @Override
- public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
- MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
- throws IOException {
-
- String jsonString = "";
- try (BufferedReader bufferedReader = new BufferedReader(
- new InputStreamReader(entityStream))) {
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- jsonString += line;
- }
-
- return jsonString;
- }
- }
-}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java
new file mode 100644
index 000000000..3c903c82b
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java
@@ -0,0 +1,66 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+
+/**
+ * Provider that decodes "text/plain" messages.
+ */
+@Provider
+@Consumes(TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN)
+public class TextMessageBodyHandler implements MessageBodyReader<Object> {
+ public static final String MEDIA_TYPE_TEXT_PLAIN = "text/plain";
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
+ return (mediaType != null && MEDIA_TYPE_TEXT_PLAIN.equals(mediaType.toString()));
+ }
+
+ @Override
+ public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException {
+
+ try (BufferedReader bufferedReader =
+ new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) {
+ List<Object> messages = new LinkedList<>();
+ String msg;
+ while ((msg = bufferedReader.readLine()) != null) {
+ messages.add(msg);
+ }
+
+ return messages;
+ }
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java
index 899c0e081..b9e0efaaf 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,19 +21,15 @@
package org.onap.policy.models.sim.dmaap.startstop;
-import org.onap.policy.common.parameters.ParameterService;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
import org.onap.policy.models.sim.dmaap.rest.DmaapSimRestServer;
/**
* This class activates the DMaaP simulator as a complete service.
*/
public class DmaapSimActivator extends ServiceManagerContainer {
- /**
- * The DMaaP simulator REST API server.
- */
- private DmaapSimRestServer restServer;
/**
* Instantiate the activator for the DMaaP simulator as a complete service.
@@ -42,19 +39,11 @@ public class DmaapSimActivator extends ServiceManagerContainer {
public DmaapSimActivator(final DmaapSimParameterGroup dmaapSimParameterGroup) {
super("DMaaP Simulator");
- // @formatter:off
- addAction("DMaaP Simulator parameters",
- () -> ParameterService.register(dmaapSimParameterGroup),
- () -> ParameterService.deregister(dmaapSimParameterGroup.getName()));
-
- addAction("Create REST server",
- () -> restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters()),
- () -> restServer = null
- );
+ DmaapSimProvider provider = new DmaapSimProvider(dmaapSimParameterGroup);
+ DmaapSimProvider.setInstance(provider);
+ addAction("Sim Provider", provider::start, provider::stop);
- addAction("REST server",
- () -> restServer.start(),
- () -> restServer.stop());
- // @formatter:on
+ DmaapSimRestServer restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters());
+ addAction("REST server", restServer::start, restServer::stop);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java
index cf559f712..724c3dc35 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java
@@ -26,13 +26,15 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URL;
import java.util.Arrays;
-
+import lombok.Getter;
+import lombok.Setter;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.utils.resources.ResourceUtils;
import org.onap.policy.models.sim.dmaap.DmaapSimException;
import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
@@ -46,6 +48,9 @@ public class DmaapSimCommandLineArguments {
private static final int HELP_LINE_LENGTH = 120;
private final Options options;
+
+ @Getter
+ @Setter
private String configurationFilePath = null;
/**
@@ -145,7 +150,7 @@ public class DmaapSimCommandLineArguments {
* @throws DmaapSimException on command argument validation errors
*/
public void validate() throws DmaapSimException {
- validateReadableFile("DMaaP simulator configuration", configurationFilePath);
+ validateFileExists("DMaaP simulator configuration", configurationFilePath);
}
/**
@@ -175,15 +180,6 @@ public class DmaapSimCommandLineArguments {
}
/**
- * Gets the configuration file path.
- *
- * @return the configuration file path
- */
- public String getConfigurationFilePath() {
- return configurationFilePath;
- }
-
- /**
* Gets the full expanded configuration file path.
*
* @return the configuration file path
@@ -193,16 +189,6 @@ public class DmaapSimCommandLineArguments {
}
/**
- * Sets the configuration file path.
- *
- * @param configurationFilePath the configuration file path
- */
- public void setConfigurationFilePath(final String configurationFilePath) {
- this.configurationFilePath = configurationFilePath;
-
- }
-
- /**
* Check set configuration file path.
*
* @return true, if check set configuration file path
@@ -212,14 +198,14 @@ public class DmaapSimCommandLineArguments {
}
/**
- * Validate readable file.
+ * Validate file exists.
*
* @param fileTag the file tag
* @param fileName the file name
* @throws DmaapSimException on the file name passed as a parameter
*/
- private void validateReadableFile(final String fileTag, final String fileName) throws DmaapSimException {
- if (fileName == null || fileName.length() == 0) {
+ private void validateFileExists(final String fileTag, final String fileName) throws DmaapSimException {
+ if (StringUtils.isBlank(fileName)) {
throw new DmaapSimException(fileTag + " file was not specified as an argument");
}
@@ -233,11 +219,5 @@ public class DmaapSimCommandLineArguments {
if (!theFile.exists()) {
throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" does not exist");
}
- if (!theFile.isFile()) {
- throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is not a normal file");
- }
- if (!theFile.canRead()) {
- throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is ureadable");
- }
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java
index 878d008a8..7b4f41b07 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,9 +22,6 @@
package org.onap.policy.models.sim.dmaap.startstop;
import java.util.Arrays;
-
-import org.onap.policy.common.utils.services.Registry;
-import org.onap.policy.models.sim.dmaap.DmaapSimConstants;
import org.onap.policy.models.sim.dmaap.DmaapSimException;
import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler;
@@ -75,14 +73,12 @@ public class Main {
// Now, create the activator for the DMaaP Simulator service
activator = new DmaapSimActivator(parameterGroup);
- Registry.register(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR, activator);
// Start the activator
try {
activator.start();
} catch (final RuntimeException e) {
LOGGER.error("start of DMaaP simulator service failed, used parameters are {}", Arrays.toString(args), e);
- Registry.unregister(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR);
return;
}
@@ -110,7 +106,7 @@ public class Main {
parameterGroup = null;
// clear the DMaaP simulator activator
- if (activator != null) {
+ if (activator != null && activator.isAlive()) {
activator.stop();
}
}
@@ -128,8 +124,9 @@ public class Main {
public void run() {
try {
// Shutdown the DMaaP simulator service and wait for everything to stop
- activator.stop();
- } catch (final RuntimeException e) {
+ shutdown();
+
+ } catch (final RuntimeException | DmaapSimException e) {
LOGGER.warn("error occured during shut down of the DMaaP simulator service", e);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json b/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json
index dd2477a24..e936eb034 100644
--- a/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json
+++ b/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json
@@ -1,5 +1,6 @@
{
"name": "DMaapSim",
+ "topicSweepSec": 900,
"restServerParameters": {
"host": "0.0.0.0",
"port": 3904
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java
new file mode 100644
index 000000000..4e37a5e36
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java
@@ -0,0 +1,39 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+
+public class DmaapSimXxxExceptionTest {
+
+ @Test
+ public void testDmaapSimException() {
+ assertEquals(3, new ExceptionsTester().test(DmaapSimException.class));
+ }
+
+ @Test
+ public void testDmaapSimRuntimeException() {
+ assertEquals(3, new ExceptionsTester().test(DmaapSimRuntimeException.class));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java
new file mode 100644
index 000000000..4513ffb82
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java
@@ -0,0 +1,305 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConsumerGroupDataTest {
+ private static final int WAIT_MS = 5000;
+ private static final int MIN_WAIT_MS = WAIT_MS / 2;
+ private static final String MY_TOPIC = "my-topic";
+ private static final String MY_CONSUMER = "my-consumer";
+ private static final String MSG1 = "hello";
+ private static final String MSG2 = "there";
+ private static final String MSG3 = "world";
+ private static final int MAX_THREADS = 30;
+
+ private MyData data;
+ private MyReader thread;
+ private List<MyReader> threads;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ public void setUp() {
+ data = new MyData();
+ thread = null;
+ threads = new ArrayList<>(MAX_THREADS);
+ }
+
+ /**
+ * Stops any running thread.
+ */
+ @After
+ public void tearDown() {
+ for (MyReader thr : threads) {
+ thr.interrupt();
+ }
+
+ for (MyReader thr : threads) {
+ thr.await();
+ }
+ }
+
+ @Test
+ public void testShouldRemove() throws InterruptedException {
+ assertFalse(data.shouldRemove());
+ assertTrue(data.shouldRemove());
+
+ data = new MyData();
+
+ // start a reader thread and wait for it to poll its queue
+ startReader(0, 10);
+ assertTrue(data.await());
+
+ assertFalse(data.shouldRemove());
+ }
+
+ @Test
+ public void testRead() {
+ data.enqueue(MSG1, MSG2, MSG3, MSG1, MSG2, MSG3);
+
+ // this reader only wants one
+ startReader(1, 1);
+ assertTrue(thread.await());
+ assertEquals("[hello]", thread.result.toString());
+
+ // this reader wants three
+ startReader(3, 1);
+ assertTrue(thread.await());
+ assertEquals("[there, world, hello]", thread.result.toString());
+
+ // this reader wants three, but will only get two
+ startReader(3, 1);
+ assertTrue(thread.await());
+ assertEquals("[there, world]", thread.result.toString());
+ }
+
+ @Test
+ public void testRead_Idle() throws InterruptedException {
+ // force it to idle
+ data.shouldRemove();
+ data.shouldRemove();
+
+ long tbeg = System.currentTimeMillis();
+ assertSame(ConsumerGroupData.UNREADABLE_LIST, data.read(1, WAIT_MS));
+
+ // should not have waited
+ assertTrue(System.currentTimeMillis() < tbeg + MIN_WAIT_MS);
+ }
+
+ @Test
+ public void testRead_NegativeCount() throws InterruptedException {
+ data.enqueue(MSG1, MSG2);
+ startReader(-1, 3);
+ assertTrue(data.await());
+
+ // wait time should be unaffected
+ assertEquals(3L, data.waitMs2);
+
+ assertTrue(thread.await());
+
+ // should only return one message
+ assertEquals("[hello]", thread.result.toString());
+ }
+
+ @Test
+ public void testRead_NegativeWait() throws InterruptedException {
+ data.enqueue(MSG1, MSG2, MSG3);
+ startReader(2, -3);
+ assertTrue(data.await());
+
+ assertEquals(0L, data.waitMs2);
+
+ assertTrue(thread.await());
+
+ // should return two messages, as requested
+ assertEquals("[hello, there]", thread.result.toString());
+ }
+
+ @Test
+ public void testRead_NoMessages() throws InterruptedException {
+ startReader(0, 0);
+ assertTrue(data.await());
+
+ assertTrue(thread.await());
+ assertTrue(thread.result.isEmpty());
+ }
+
+ @Test
+ public void testRead_MultiThreaded() {
+ // queue up a bunch of messages
+ final int expected = MAX_THREADS * 3;
+ for (int x = 0; x < expected; ++x) {
+ data.enqueue(MSG1);
+ }
+
+ for (int x = 0; x < MAX_THREADS; ++x) {
+ startReader(4, 1);
+ }
+
+ int actual = 0;
+ for (MyReader thr : threads) {
+ thr.await();
+ actual += thr.result.size();
+ }
+
+ assertEquals(expected, actual);
+ }
+
+
+ /**
+ * Starts a reader thread.
+ *
+ * @param limit number of messages to read at one time
+ * @param waitMs wait time, in milliseconds
+ */
+ private void startReader(int limit, long waitMs) {
+ thread = new MyReader(limit, waitMs);
+
+ thread.setDaemon(true);
+ thread.start();
+
+ threads.add(thread);
+ }
+
+
+ private class MyData extends ConsumerGroupData {
+
+ /**
+ * Decremented when {@link #getNextMessage(long)} is invoked.
+ */
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ /**
+ * Messages to be added to the queue when {@link #getNextMessage(long)} is
+ * invoked.
+ */
+ private final List<String> messages = new ArrayList<>();
+
+ /**
+ * Value passed to {@link #getNextMessage(long)}.
+ */
+ private volatile long waitMs2 = -1;
+
+ /**
+ * Constructs the object.
+ */
+ public MyData() {
+ super(MY_TOPIC, MY_CONSUMER);
+ }
+
+ /**
+ * Arranges for messages to be injected into the queue the next time
+ * {@link #getNextMessage(long)} is invoked.
+ *
+ * @param messages the messages to be injected
+ */
+ public void enqueue(String... messages) {
+ this.messages.addAll(Arrays.asList(messages));
+ }
+
+ @Override
+ protected String getNextMessage(long waitMs) throws InterruptedException {
+ waitMs2 = waitMs;
+
+ latch.countDown();
+
+ synchronized (messages) {
+ write(messages);
+ messages.clear();
+ }
+
+ return super.getNextMessage(waitMs);
+ }
+
+ /**
+ * Waits for {@link #getNextMessage(long)} to be invoked.
+ *
+ * @return {@code true} if {@link #getNextMessage(long)} was invoked,
+ * {@code false} if the timer expired first
+ * @throws InterruptedException if the current thread is interrupted while waiting
+ */
+ public boolean await() throws InterruptedException {
+ return latch.await(WAIT_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Thread that will invoke the consumer group's read() method one time.
+ */
+ private class MyReader extends Thread {
+ private final ConsumerGroupData group = data;
+ private final int limit;
+ private final long waitMs;
+
+ /**
+ * Result returned by the read() method.
+ */
+ private List<String> result = Collections.emptyList();
+
+ public MyReader(int limit, long waitMs) {
+ this.limit = limit;
+ this.waitMs = waitMs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ result = group.read(limit, waitMs);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Waits for the thread to complete.
+ *
+ * @return {@code true} if the thread completed, {@code false} if the thread is
+ * still running
+ */
+ public boolean await() {
+ try {
+ this.join(WAIT_MS);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ return !this.isAlive();
+ }
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java
new file mode 100644
index 000000000..f8c141614
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java
@@ -0,0 +1,287 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+
+public class DmaapSimProviderTest {
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final long SWEEP_SEC = 10L;
+ private static final String TOPIC1 = "topic-A";
+ private static final String TOPIC2 = "topic-B";
+ private static final String CONSUMER1 = "consumer-X";
+ private static final String CONSUMER_ID1 = "id1";
+
+ private MyProvider prov;
+
+ @Mock
+ private DmaapSimParameterGroup params;
+
+ @Mock
+ private ScheduledExecutorService timer;
+
+ @Mock
+ private TopicData data1;
+
+ @Mock
+ private TopicData data2;
+
+ @Captor
+ private ArgumentCaptor<List<Object>> listCaptor;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
+
+ prov = new MyProvider(params);
+ }
+
+ /**
+ * Shuts down the provider, if it's running.
+ */
+ @After
+ public void tearDown() {
+ if (prov.isAlive()) {
+ prov.shutdown();
+ }
+ }
+
+ /**
+ * Verifies that the constructor adds all of the expected actions to the service
+ * manager container.
+ */
+ @Test
+ public void testDmaapSimProvider() {
+ prov.start();
+ verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
+
+ prov.stop();
+ verify(timer).shutdown();
+ }
+
+ @Test
+ public void testProcessDmaapMessagePut_List() throws CoderException {
+ prov = spy(new MyProvider(params));
+
+ when(data1.write(any())).thenReturn(2);
+
+ // force topics to exist
+ prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
+ prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
+
+ List<Object> lst = Arrays.asList("hello", "world");
+ Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
+ assertEquals(Status.OK.getStatusCode(), resp.getStatus());
+ StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
+ assertEquals("2", sco.getString("count"));
+
+ List<Object> lst2 = Arrays.asList("helloB", "worldB");
+ prov.processDmaapMessagePut(TOPIC1, lst2);
+ prov.processDmaapMessagePut(TOPIC2, lst2);
+
+ // should only invoke this once for each topic
+ verify(prov).makeTopicData(TOPIC1);
+ verify(prov).makeTopicData(TOPIC2);
+
+ // should process all writes
+ verify(data1).write(lst);
+ verify(data1).write(lst2);
+
+ verify(data2).write(lst2);
+ }
+
+ @Test
+ public void testProcessDmaapMessagePut_Single() throws CoderException {
+ prov = spy(new MyProvider(params));
+
+ // force topics to exist
+ prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
+ prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
+
+ final String value1 = "abc";
+ Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
+ assertEquals(Status.OK.getStatusCode(), resp.getStatus());
+
+ // ensure that the response can be decoded
+ new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
+
+ final String value2 = "def";
+ prov.processDmaapMessagePut(TOPIC1, value2);
+ prov.processDmaapMessagePut(TOPIC2, value2);
+
+ // should only invoke this once for each topic
+ verify(prov).makeTopicData(TOPIC1);
+ verify(prov).makeTopicData(TOPIC2);
+
+ // should process all writes as singleton lists
+ listCaptor.getAllValues().clear();
+ verify(data1, times(2)).write(listCaptor.capture());
+ assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
+ assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
+
+ listCaptor.getAllValues().clear();
+ verify(data2).write(listCaptor.capture());
+ assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(0));
+ }
+
+ @Test
+ public void testProcessDmaapMessageGet() throws InterruptedException {
+ List<String> msgs = Arrays.asList("400", "500");
+ when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
+
+ Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
+ assertEquals(Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals(msgs.toString(), resp.getEntity().toString());
+ }
+
+ @Test
+ public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
+ when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
+
+ Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
+ assertEquals(Status.REQUEST_TIMEOUT.getStatusCode(), resp.getStatus());
+ assertEquals("[]", resp.getEntity().toString());
+ }
+
+ @Test
+ public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
+ BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
+
+ // put in a background thread so it doesn't interrupt the tester thread
+ new Thread(() -> {
+ try {
+ when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
+ respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }).start();
+
+ Response resp = respQueue.poll(3, TimeUnit.SECONDS);
+ assertNotNull(resp);
+
+ assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
+ assertEquals("[]", resp.getEntity().toString());
+ }
+
+ @Test
+ public void testSweepTopicTaskRun() {
+ prov.start();
+ prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
+ prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
+
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
+
+ captor.getValue().run();
+ verify(data1).removeIdleConsumers();
+ verify(data2).removeIdleConsumers();
+
+ // run it again
+ captor.getValue().run();
+ verify(data1, times(2)).removeIdleConsumers();
+ verify(data2, times(2)).removeIdleConsumers();
+ }
+
+ @Test
+ public void testMakeTimerPool() {
+ // use a real provider so we can test the real makeTimer() method
+ DmaapSimProvider prov2 = new DmaapSimProvider(params);
+ prov2.start();
+ prov2.stop();
+ }
+
+ @Test
+ public void testMakeTopicData() {
+ // use a real provider so we can test the real makeTopicData() method
+ DmaapSimProvider prov2 = new DmaapSimProvider(params);
+ prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
+ }
+
+ @Test
+ public void testGetInstance_testSetInstance() {
+ DmaapSimProvider.setInstance(prov);
+ assertSame(prov, DmaapSimProvider.getInstance());
+
+ DmaapSimProvider.setInstance(null);
+ assertNull(DmaapSimProvider.getInstance());
+ }
+
+
+ public class MyProvider extends DmaapSimProvider {
+
+ public MyProvider(DmaapSimParameterGroup params) {
+ super(params);
+ }
+
+ @Override
+ protected ScheduledExecutorService makeTimerPool() {
+ return timer;
+ }
+
+ @Override
+ protected TopicData makeTopicData(String topicName) {
+ switch (topicName) {
+ case TOPIC1:
+ return data1;
+ case TOPIC2:
+ return data2;
+ default:
+ throw new IllegalArgumentException("unknown topic name: " + topicName);
+ }
+ }
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java
new file mode 100644
index 000000000..f7e1f5e6c
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java
@@ -0,0 +1,213 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+
+public class TopicDataTest {
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String GROUP1 = "group-A";
+ private static final String GROUP2 = "group-B";
+ private static final String GROUP3 = "group-C";
+
+ private TopicData data;
+ private ConsumerGroupData consgrp1;
+ private ConsumerGroupData consgrp2;
+ private ConsumerGroupData consgrp3;
+ private List<ConsumerGroupData> groups;
+
+ /**
+ * Sets up mocks and the initial data object.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Before
+ public void setUp() throws Exception {
+ consgrp1 = mock(ConsumerGroupData.class);
+ consgrp2 = mock(ConsumerGroupData.class);
+ consgrp3 = mock(ConsumerGroupData.class);
+
+ when(consgrp1.read(anyInt(), anyLong())).thenReturn(Collections.emptyList());
+ when(consgrp2.read(anyInt(), anyLong())).thenReturn(Collections.emptyList());
+ when(consgrp3.read(anyInt(), anyLong())).thenReturn(Collections.emptyList());
+
+ groups = new LinkedList<>(Arrays.asList(consgrp1, consgrp2, consgrp3));
+
+ data = new TopicData("my-topic") {
+ @Override
+ protected ConsumerGroupData makeData(String consumerGroup) {
+ return groups.remove(0);
+ }
+ };
+ }
+
+ @Test
+ public void testRemoveIdleConsumers() throws Exception {
+ // force two consumers into the map
+ data.read(GROUP1, 0, 0);
+ data.read(GROUP2, 0, 0);
+ data.read(GROUP3, 0, 0);
+
+ // indicate that one should be removed
+ when(consgrp1.shouldRemove()).thenReturn(true);
+
+ // sweep
+ data.removeIdleConsumers();
+
+ assertEquals("[group-B, group-C]", new TreeSet<>(getGroups().keySet()).toString());
+
+ // indicate that the others should be removed
+ when(consgrp2.shouldRemove()).thenReturn(true);
+ when(consgrp3.shouldRemove()).thenReturn(true);
+
+ // sweep
+ data.removeIdleConsumers();
+
+ assertTrue(getGroups().isEmpty());
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ List<String> lst = Collections.emptyList();
+
+ when(consgrp1.read(anyInt(), anyLong())).thenReturn(ConsumerGroupData.UNREADABLE_LIST)
+ .thenReturn(ConsumerGroupData.UNREADABLE_LIST).thenReturn(lst);
+
+ assertSame(lst, data.read(GROUP1, 10, 20));
+
+ // should have invoked three times
+ verify(consgrp1, times(3)).read(anyInt(), anyLong());
+
+ // should have used the given values
+ verify(consgrp1, times(3)).read(10, 20);
+
+ // should not have allocated more than one group
+ assertEquals(2, groups.size());
+ }
+
+ @Test
+ public void testRead_MultipleGroups() throws Exception {
+ List<String> lst1 = Collections.emptyList();
+ when(consgrp1.read(anyInt(), anyLong())).thenReturn(lst1);
+
+ List<String> lst2 = Collections.emptyList();
+ when(consgrp2.read(anyInt(), anyLong())).thenReturn(lst2);
+
+ // one from each group
+ assertSame(lst1, data.read(GROUP1, 0, 0));
+ assertSame(lst2, data.read(GROUP2, 0, 0));
+
+ // repeat
+ assertSame(lst1, data.read(GROUP1, 0, 0));
+ assertSame(lst2, data.read(GROUP2, 0, 0));
+
+ // again
+ assertSame(lst1, data.read(GROUP1, 0, 0));
+ assertSame(lst2, data.read(GROUP2, 0, 0));
+
+ // should still have group3 in the list
+ assertEquals(1, groups.size());
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ // no groups yet
+ List<Object> messages = Arrays.asList("hello", "world");
+ data.write(messages);
+
+ // add two groups
+ data.read(GROUP1, 0, 0);
+ data.read(GROUP2, 0, 0);
+
+ data.write(messages);
+
+ // should have been written to both groups
+ List<String> strings = messages.stream().map(Object::toString).collect(Collectors.toList());
+ verify(consgrp1).write(strings);
+ verify(consgrp2).write(strings);
+ }
+
+ @Test
+ public void testConvertMessagesToStrings() {
+ assertEquals("[abc, 200]", data.convertMessagesToStrings(Arrays.asList("abc", null, 200)).toString());
+ }
+
+ @Test
+ public void testConvertMessageToString() throws CoderException {
+ Coder coder = new StandardCoder();
+
+ assertNull(data.convertMessageToString(null, coder));
+ assertEquals("text-msg", data.convertMessageToString("text-msg", coder));
+ assertEquals("100", data.convertMessageToString(100, coder));
+
+ coder = mock(Coder.class);
+ when(coder.encode(any())).thenThrow(new CoderException(EXPECTED_EXCEPTION));
+ assertNull(data.convertMessageToString(new TreeMap<String,Object>(), coder));
+ }
+
+ @Test
+ public void testMakeData() throws Exception {
+ // use real objects instead of mocks
+ TopicData data2 = new TopicData("real-data-topic");
+
+ // force a group into the topic
+ data2.read(GROUP1, 0, 0);
+
+ data2.write(Arrays.asList("abc", "def", "ghi"));
+
+ assertEquals("[abc, def]", data2.read(GROUP1, 2, 0).toString());
+ }
+
+ /**
+ * Gets the consumer group map from the topic data object.
+ *
+ * @return the topic's consumer group map
+ */
+ @SuppressWarnings("unchecked")
+ private Map<String, ConsumerGroupData> getGroups() {
+ return (Map<String, ConsumerGroupData>) Whitebox.getInternalState(data, "group2data");
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java
new file mode 100644
index 000000000..5cba78355
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.UUID;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BaseRestControllerV1Test {
+
+ private BaseRestControllerV1 ctlr;
+ private ResponseBuilder bldr;
+
+ @Before
+ public void setUp() {
+ ctlr = new BaseRestControllerV1();
+ bldr = Response.status(Response.Status.OK);
+ }
+
+ @Test
+ public void testAddVersionControlHeaders() {
+ Response resp = ctlr.addVersionControlHeaders(bldr).build();
+ assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_MINOR_NAME));
+ assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_PATCH_NAME));
+ assertEquals("1.0.0", resp.getHeaderString(BaseRestControllerV1.VERSION_LATEST_NAME));
+ }
+
+ @Test
+ public void testAddLoggingHeaders_Null() {
+ Response resp = ctlr.addLoggingHeaders(bldr, null).build();
+ assertNotNull(resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME));
+ }
+
+ @Test
+ public void testAddLoggingHeaders_NonNull() {
+ UUID uuid = UUID.randomUUID();
+ Response resp = ctlr.addLoggingHeaders(bldr, uuid).build();
+ assertEquals(uuid.toString(), resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java
new file mode 100644
index 000000000..5d9186c75
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java
@@ -0,0 +1,145 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import javax.ws.rs.core.MediaType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CambriaMessageBodyHandlerTest {
+ private static final String STD_INPUT = "1.3.XAbc";
+ private static final String EXPECTED_OUTPUT = "[Abc]";
+
+ private CambriaMessageBodyHandler hdlr;
+
+ @Before
+ public void setUp() {
+ hdlr = new CambriaMessageBodyHandler();
+ }
+
+ @Test
+ public void testIsReadable() {
+ assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("application/cambria")));
+
+ assertFalse(hdlr.isReadable(null, null, null, null));
+ assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("application/other")));
+ assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/cambria")));
+ }
+
+ @Test
+ public void testReadFrom() throws IOException {
+ List<Object> lst = readStream("1.11.AMessageBody", "3.3.123Foo3.3.123Bar", "0.16.You can do that..8.Or that.");
+ assertEquals("[MessageBody, Foo, Bar, You can do that., Or that.]", lst.toString());
+
+ // empty stream
+ lst = readStream();
+ assertEquals("[]", lst.toString());
+ }
+
+ @Test
+ public void testReadMessage_InvalidPartitionLength() {
+ assertThatThrownBy(() -> readStream("100000000.3.")).isInstanceOf(IOException.class)
+ .hasMessage("invalid partition length");
+ }
+
+ @Test
+ public void testReadMessage_InvalidMessageLength() {
+ assertThatThrownBy(() -> readStream("3.100000000.ABC")).isInstanceOf(IOException.class)
+ .hasMessage("invalid message length");
+ }
+
+ @Test
+ public void testSkipWhitespace() throws IOException {
+ // no white space
+ assertEquals(EXPECTED_OUTPUT, readStream(STD_INPUT).toString());
+
+ // single white space
+ assertEquals(EXPECTED_OUTPUT, readStream(" " + STD_INPUT).toString());
+
+ // multiple white spaces
+ assertEquals(EXPECTED_OUTPUT, readStream("\n\n\t" + STD_INPUT).toString());
+ }
+
+ @Test
+ public void testReadLength_NoDigits() throws IOException {
+ assertEquals("[]", readStream("..").toString());
+ }
+
+ @Test
+ public void testReadLength_NoDot() {
+ assertThatThrownBy(() -> readStream("3.2")).isInstanceOf(EOFException.class)
+ .hasMessage("missing '.' in 'length' field");
+ }
+
+ @Test
+ public void testReadLength_NonDigit() {
+ assertThatThrownBy(() -> readStream("3.2x.ABCde")).isInstanceOf(IOException.class)
+ .hasMessage("invalid character in 'length' field");
+ }
+
+ @Test
+ public void testReadLength_TooManyDigits() {
+ assertThatThrownBy(() -> readStream("3.12345678901234567890.ABCde")).isInstanceOf(IOException.class)
+ .hasMessage("too many digits in 'length' field");
+ }
+
+ @Test
+ public void testReadString_ZeroLength() throws IOException {
+ assertEquals("[]", readStream("1..X").toString());
+ }
+
+ @Test
+ public void testReadString_TooShort() {
+ assertThatThrownBy(() -> readStream(".5.me")).isInstanceOf(EOFException.class).hasMessageContaining("actual");
+ }
+
+ /**
+ * Reads a stream via the handler.
+ *
+ * @param text lines of text to be read
+ * @return the list of objects that were decoded from the stream
+ * @throws IOException if an error occurs
+ */
+ private List<Object> readStream(String... text) throws IOException {
+ return hdlr.readFrom(null, null, null, null, null, makeStream(text));
+ }
+
+ /**
+ * Creates an input stream from lines of text.
+ *
+ * @param text lines of text
+ * @return an input stream
+ */
+ private InputStream makeStream(String... text) {
+ return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java
new file mode 100644
index 000000000..7e30c5a4c
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java
@@ -0,0 +1,181 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.charset.StandardCharsets;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import lombok.Getter;
+import org.glassfish.jersey.client.ClientProperties;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.gson.GsonMessageBodyHandler;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.common.utils.services.Registry;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.startstop.Main;
+import org.onap.policy.sim.dmaap.parameters.CommonTestData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common base class for rest server tests.
+ */
+public class CommonRestServer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CommonRestServer.class);
+
+ public static final String NOT_ALIVE = "not alive";
+ public static final String ALIVE = "alive";
+ public static final String SELF = "self";
+ public static final String NAME = "DMaaP Simulator";
+ public static final String ENDPOINT_PREFIX = "events/";
+
+ @Getter
+ private static int port;
+
+ protected static String httpPrefix;
+
+ private static Main main;
+
+ /**
+ * Allocates a port for the server, writes a config file, and then starts Main.
+ *
+ * @throws Exception if an error occurs
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ port = NetworkUtil.allocPort();
+
+ httpPrefix = "http://localhost:" + port + "/";
+
+ String json = new CommonTestData().getParameterGroupAsString(port);
+ makeConfigFile("src/test/resources/parameters/TestConfigParams.json", json);
+
+ HttpServletServerFactoryInstance.getServerFactory().destroy();
+
+ startMain();
+ }
+
+ /**
+ * Stops Main.
+ */
+ @AfterClass
+ public static void teardownAfterClass() {
+ try {
+ if (main != null) {
+ Main main2 = main;
+ main = null;
+
+ main2.shutdown();
+ }
+
+ } catch (DmaapSimException exp) {
+ LOGGER.error("cannot stop main", exp);
+ }
+ }
+
+ /**
+ * Set up.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Before
+ public void setUp() throws Exception {
+ // restart, if not currently running
+ if (main == null) {
+ startMain();
+ }
+ }
+
+ /**
+ * Makes a parameter configuration file.
+ * @param fileName name of the config file to be created
+ * @param json json to be written to the file
+ *
+ * @throws Exception if an error occurs
+ */
+ protected static void makeConfigFile(String fileName, String json) throws Exception {
+ File file = new File(fileName);
+ file.deleteOnExit();
+
+ try (FileOutputStream output = new FileOutputStream(file)) {
+ output.write(json.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ /**
+ * Starts the "Main".
+ *
+ * @throws Exception if an error occurs
+ */
+ private static void startMain() throws Exception {
+ Registry.newRegistry();
+
+ // make sure port is available
+ if (NetworkUtil.isTcpPortOpen("localhost", port, 1, 1L)) {
+ throw new IllegalStateException("port " + port + " is still in use");
+ }
+
+ final String[] simConfigParameters = {"-c", "src/test/resources/parameters/TestConfigParams.json"};
+
+ main = new Main(simConfigParameters);
+
+ if (!NetworkUtil.isTcpPortOpen("localhost", port, 60, 1000L)) {
+ throw new IllegalStateException("server is not listening on port " + port);
+ }
+ }
+
+ /**
+ * Sends a request to an endpoint.
+ *
+ * @param endpoint the target endpoint
+ * @return a request builder
+ * @throws Exception if an error occurs
+ */
+ protected Invocation.Builder sendRequest(final String endpoint) throws Exception {
+ return sendFqeRequest(httpPrefix + ENDPOINT_PREFIX + endpoint);
+ }
+
+ /**
+ * Sends a request to a fully qualified endpoint.
+ *
+ * @param fullyQualifiedEndpoint the fully qualified target endpoint
+ * @return a request builder
+ */
+ protected Invocation.Builder sendFqeRequest(final String fullyQualifiedEndpoint) {
+ final Client client = ClientBuilder.newBuilder().build();
+
+ client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true");
+ client.register(GsonMessageBodyHandler.class);
+
+ final WebTarget webTarget = client.target(fullyQualifiedEndpoint);
+
+ return webTarget.request(MediaType.APPLICATION_JSON);
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java
new file mode 100644
index 000000000..7b84d543d
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+import javax.ws.rs.core.Response;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
+
+public class DmaapSimRestControllerV1Test {
+ private static final int LIMIT = 5;
+ private static final String TOPIC = "my-topic";
+ private static final String TOPIC2 = "my-topic-B";
+ private static final String MESSAGE = "my-message";
+ private static final String MESSAGE2 = "my-message-B";
+ private static final String CONSUMER = "my-consumer";
+ private static final String CONSUMER_ID = "my-id";
+
+ private static Coder coder = new StandardCoder();
+
+ private DmaapSimRestControllerV1 rest;
+
+ /**
+ * Creates the controller.
+ *
+ * @throws CoderException if the parameters cannot be loaded
+ */
+ @Before
+ public void setUp() throws CoderException {
+ DmaapSimParameterGroup params = coder.decode(new File("src/test/resources/parameters/NormalParameters.json"),
+ DmaapSimParameterGroup.class);
+ DmaapSimProvider.setInstance(new DmaapSimProvider(params));
+ rest = new DmaapSimRestControllerV1();
+ }
+
+ @Test
+ public void test() {
+ Response resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0);
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals("[]", resp.getEntity().toString());
+
+ // add some messages
+ resp = rest.postDmaapMessage(TOPIC, Arrays.asList(MESSAGE, MESSAGE2));
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals(2, getCount(resp));
+
+ resp = rest.postDmaapMessage(TOPIC2, Arrays.asList(MESSAGE, MESSAGE2, MESSAGE));
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals(3, getCount(resp));
+
+ // hadn't registered with topic 2 so nothing expected from there
+ resp = rest.getDmaapMessage(TOPIC2, CONSUMER, CONSUMER_ID, LIMIT, 0);
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals("[]", resp.getEntity().toString());
+
+ // now read from topic 1
+ resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0);
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals("[my-message, my-message-B]", resp.getEntity().toString());
+ }
+
+ private int getCount(Response resp) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = (Map<String, Object>) resp.getEntity();
+
+ return (int) map.get("count");
+ }
+
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java
new file mode 100644
index 000000000..2dfbae980
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import javax.ws.rs.core.MediaType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TextMessageBodyHandlerTest {
+ private TextMessageBodyHandler hdlr;
+
+ @Before
+ public void setUp() {
+ hdlr = new TextMessageBodyHandler();
+ }
+
+ @Test
+ public void testIsReadable() {
+ assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("text/plain")));
+
+ assertFalse(hdlr.isReadable(null, null, null, null));
+ assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("text/other")));
+ assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/plain")));
+ }
+
+ @Test
+ public void testReadFrom() throws IOException {
+ List<Object> lst = readStream("hello", "world");
+ assertEquals("[hello, world]", lst.toString());
+
+ // empty stream
+ lst = readStream();
+ assertEquals("[]", lst.toString());
+ }
+
+ /**
+ * Reads a stream via the handler.
+ *
+ * @param text lines of text to be read
+ * @return the list of objects that were decoded from the stream
+ * @throws IOException if an error occurs
+ */
+ private List<Object> readStream(String... text) throws IOException {
+ return hdlr.readFrom(null, null, null, null, null, makeStream(text));
+ }
+
+ /**
+ * Creates an input stream from lines of text.
+ *
+ * @param text lines of text
+ * @return an input stream
+ */
+ private InputStream makeStream(String... text) {
+ return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
new file mode 100644
index 000000000..8c35de64e
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
@@ -0,0 +1,199 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.e2e;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
+
+/**
+ * This tests the simulator using dmaap endpoints to verify that it works from publisher
+ * to subscriber.
+ */
+public class EndToEndTest extends CommonRestServer {
+ private static final int MAX_WAIT_SEC = 5;
+ private static final String ORIG_TOPIC = "MY-TOPIC";
+ private static final String ORIG_TOPIC2 = "MY-TOPIC-B";
+ private static final int MAX_MSG = 200;
+
+ private static int ntests = 0;
+ private static String topicJson;
+
+ private TopicParameterGroup topicConfig;
+
+ private String topic = "MY-TOPIC";
+ private String topic2 = "MY-TOPIC-B";
+
+ /**
+ * Messages from the topic are placed here by the endpoint.
+ */
+ private BlockingQueue<String> queue;
+
+ /**
+ * Messages from topic 2 are placed here by the endpoint.
+ */
+ private BlockingQueue<String> queue2;
+
+ /**
+ * Starts the rest server.
+ *
+ * @throws Exception if an error occurs
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TopicEndpointManager.getManager().shutdown();
+
+ CommonRestServer.setUpBeforeClass();
+
+ topicJson = new String(
+ Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
+ StandardCharsets.UTF_8);
+ topicJson = topicJson.replace("${port}", String.valueOf(getPort()));
+ }
+
+ /**
+ * Starts the topics.
+ *
+ * @throws CoderException if the parameters cannot be decoded
+ */
+ @Before
+ @Override
+ public void setUp() throws CoderException {
+ queue = new LinkedBlockingQueue<>();
+ queue2 = new LinkedBlockingQueue<>();
+
+ /*
+ * change topic names for each test so any listeners that may still exist will not
+ * grab new messages
+ */
+
+ ++ntests;
+ topic = "my-topic-" + ntests;
+ topic2 = "my-topic-b" + ntests;
+
+ String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic);
+
+ topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
+
+ TopicEndpointManager.getManager().addTopics(topicConfig);
+ TopicEndpointManager.getManager().start();
+ }
+
+ @After
+ public void tearDown() {
+ TopicEndpointManager.getManager().shutdown();
+ }
+
+ @Test
+ public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
+ // register listeners to add events to appropriate queue
+ TopicEndpointManager.getManager().getDmaapTopicSource(topic)
+ .register((infra, topic, event) -> queue.add(event));
+ TopicEndpointManager.getManager().getDmaapTopicSource(topic2)
+ .register((infra, topic, event) -> queue2.add(event));
+
+ // publish events
+ TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic);
+ TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2);
+ for (int x = 0; x < MAX_MSG; ++x) {
+ sink.send("hello-" + x);
+ sink2.send("world-" + x);
+ }
+
+ // verify events where received
+ for (int x = 0; x < MAX_MSG; ++x) {
+ assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void testCambriaFormat() throws Exception {
+ test("testCambriaFormat", "application/cambria",
+ (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
+ }
+
+ @Test
+ public void testJson() throws Exception {
+ test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]"));
+ }
+
+ @Test
+ public void testText() throws Exception {
+ test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println));
+ }
+
+ /**
+ * Uses a raw URL connection to ensure the server can process messages of the given
+ * media type.
+ *
+ * @param testName name of the test
+ * @param mediaType media type
+ * @param writeMessages function that writes messages to a PrintWriter
+ * @throws Exception if an error occurs
+ */
+ private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
+ throws Exception {
+ String msg1 = "{'abc':10.0}".replace('\'', '"');
+ String msg2 = "{'def':20.0}".replace('\'', '"');
+
+ TopicEndpointManager.getManager().getDmaapTopicSource(topic)
+ .register((infra, topic, event) -> queue.add(event));
+
+ TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
+ URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
+
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("POST");
+ conn.setRequestProperty("Content-type", mediaType);
+ conn.setDoOutput(true);
+ conn.connect();
+
+ try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) {
+ writeMessages.accept(wtr, Arrays.asList(msg1, msg2));
+ }
+
+ assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
+
+ assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java
new file mode 100644
index 000000000..21d9ed604
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.parameters;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+
+/**
+ * Class to hold/create all parameters for test cases.
+ */
+public class CommonTestData {
+ public static final String SIM_GROUP_NAME = "DMaapSim";
+
+ private static final Coder coder = new StandardCoder();
+
+ /**
+ * Gets the standard simulator parameters.
+ *
+ * @param port port to be inserted into the parameters
+ * @return the standard simulator parameters
+ */
+ public DmaapSimParameterGroup getParameterGroup(int port) {
+ try {
+ return coder.decode(getParameterGroupAsString(port), DmaapSimParameterGroup.class);
+
+ } catch (CoderException e) {
+ throw new DmaapSimRuntimeException("cannot read simulator parameters", e);
+ }
+ }
+
+ /**
+ * Gets the standard simulator parameters, as a String.
+ *
+ * @param port port to be inserted into the parameters
+ * @return the standard simulator parameters
+ */
+ public String getParameterGroupAsString(int port) {
+
+ try {
+ File file = new File("src/test/resources/parameters/NormalParameters.json");
+ String json = new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
+
+ json = json.replace("6845", String.valueOf(port));
+
+ return json;
+
+ } catch (IOException e) {
+ throw new DmaapSimRuntimeException("cannot read simulator parameters", e);
+ }
+ }
+
+ /**
+ * Nulls out a field within a JSON string. It does it by adding a field with the same
+ * name, having a null value, and then prefixing the original field name with "Xxx",
+ * thus causing the original field and value to be ignored.
+ *
+ * @param json JSON string
+ * @param field field to be nulled out
+ * @return a new JSON string with the field nulled out
+ */
+ public String nullifyField(String json, String field) {
+ return json.replace(field + "\"", field + "\":null, \"" + field + "Xxx\"");
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java
index 1682eb991..828cd89b0 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java
@@ -1,6 +1,6 @@
-/*
+/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,17 +16,19 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.models.sim.dmaap;
+package org.onap.policy.sim.dmaap.parameters;
-/**
- * Names of various items contained in the Registry.
- */
-public class DmaapSimConstants {
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
- // Registry keys
- public static final String REG_DMAAP_SIM_ACTIVATOR = "object:activator/dmaap-sim";
+public class DmaapSimParameterGroupTest {
+ private static final String MY_NAME = "my-name";
- private DmaapSimConstants() {
- super();
+ @Test
+ public void testDmaapSimParameterGroup() {
+ DmaapSimParameterGroup params = new DmaapSimParameterGroup(MY_NAME);
+ assertEquals(MY_NAME, params.getName());
}
}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java
new file mode 100644
index 000000000..8f053d219
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.parameters;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler;
+import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments;
+
+public class DmaapSimParameterHandlerTest {
+
+ private static final String RESOURCE_DIR = "src/test/resources/parameters/";
+
+ private DmaapSimParameterHandler handler;
+
+ @Before
+ public void setUp() {
+ handler = new DmaapSimParameterHandler();
+ }
+
+ @Test
+ public void testGetParameters() throws DmaapSimException {
+ final DmaapSimCommandLineArguments args = new DmaapSimCommandLineArguments();
+
+ args.parse(new String[] {"-c", RESOURCE_DIR + "NormalParameters.json"});
+ DmaapSimParameterGroup params = handler.getParameters(args);
+ assertNotNull(params);
+ assertEquals("DMaapSim", params.getName());
+ assertEquals(300L, params.getTopicSweepSec());
+ assertEquals(6845, params.getRestServerParameters().getPort());
+
+
+ args.parse(new String[] {"-c", "FileNotFound.json"});
+ assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class)
+ .hasMessageStartingWith("error reading parameters");
+
+
+ args.parse(new String[] {"-c", RESOURCE_DIR + "EmptyParameterFile.json"});
+ assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class)
+ .hasMessageStartingWith("no parameters found");
+
+
+ args.parse(new String[] {"-c", RESOURCE_DIR + "Parameters_InvalidName.json"});
+ assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class)
+ .hasMessageContaining("validation error");
+ }
+
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java
new file mode 100644
index 000000000..380a72423
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.startstop;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.utils.services.Registry;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler;
+import org.onap.policy.models.sim.dmaap.startstop.DmaapSimActivator;
+import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments;
+
+
+/**
+ * Class to perform unit test of {@link DmaapSimActivator}}.
+ */
+public class DmaapSimActivatorTest {
+
+ private DmaapSimActivator activator;
+
+ /**
+ * Initializes an activator.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Before
+ public void setUp() throws Exception {
+ Registry.newRegistry();
+ HttpServletServerFactoryInstance.getServerFactory().destroy();
+
+ final String[] papConfigParameters = {"-c", "parameters/NormalParameters.json"};
+ final DmaapSimCommandLineArguments arguments = new DmaapSimCommandLineArguments(papConfigParameters);
+ final DmaapSimParameterGroup parGroup = new DmaapSimParameterHandler().getParameters(arguments);
+
+ activator = new DmaapSimActivator(parGroup);
+ }
+
+ /**
+ * Method for cleanup after each test.
+ *
+ * @throws Exception if an error occurs
+ */
+ @After
+ public void teardown() throws Exception {
+ if (activator != null && activator.isAlive()) {
+ activator.stop();
+ }
+ }
+
+ @Test
+ public void testDmaapSimActivator() {
+ assertFalse(activator.isAlive());
+ activator.start();
+ assertTrue(activator.isAlive());
+
+ // repeat - should throw an exception
+ assertThatIllegalStateException().isThrownBy(() -> activator.start());
+ assertTrue(activator.isAlive());
+ }
+
+ @Test
+ public void testTerminate() {
+ activator.start();
+ activator.stop();
+ assertFalse(activator.isAlive());
+
+ // repeat - should throw an exception
+ assertThatIllegalStateException().isThrownBy(() -> activator.stop());
+ assertFalse(activator.isAlive());
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java
new file mode 100644
index 000000000..b8e285a99
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.startstop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.startstop.Main;
+import org.onap.policy.sim.dmaap.parameters.CommonTestData;
+
+/**
+ * Class to perform unit test of {@link Main}}.
+ *
+ * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
+ */
+public class MainTest {
+ private Main main;
+
+ /**
+ * Set up.
+ */
+ @Before
+ public void setUp() {
+ main = null;
+ HttpServletServerFactoryInstance.getServerFactory().destroy();
+ }
+
+ /**
+ * Shuts "main" down.
+ *
+ * @throws Exception if an error occurs
+ */
+ @After
+ public void tearDown() throws Exception {
+ if (main != null) {
+ main.shutdown();
+ }
+ }
+
+ @Test
+ public void testMain() throws DmaapSimException {
+ final String[] NormalParameters = {"-c", "parameters/NormalParameters.json"};
+ main = new Main(NormalParameters);
+ assertTrue(main.getParameters().isValid());
+ assertEquals(CommonTestData.SIM_GROUP_NAME, main.getParameters().getName());
+
+ main.shutdown();
+ }
+
+ @Test
+ public void testMain_NoArguments() {
+ final String[] NormalParameters = {};
+ main = new Main(NormalParameters);
+ assertTrue(main.getParameters() == null);
+ }
+
+ @Test
+ public void testMain_InvalidArguments() {
+ // note: this is missing the "-c" argument, thus the ARGUMENTS are invalid
+ final String[] NormalParameters = {"parameters/NormalParameters.json"};
+ main = new Main(NormalParameters);
+ assertTrue(main.getParameters() == null);
+ }
+
+ @Test
+ public void testMain_Help() {
+ final String[] NormalParameters = {"-h"};
+ Main.main(NormalParameters);
+ }
+
+ @Test
+ public void testMain_InvalidParameters() {
+ final String[] NormalParameters = {"-c", "parameters/InvalidParameters.json"};
+ main = new Main(NormalParameters);
+ assertTrue(main.getParameters() == null);
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json
index aeedf9d6e..a1c98a5b1 100644
--- a/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json
@@ -1,5 +1,6 @@
-{
+{
"name":"DMaapSim",
+ "topicSweepSec": 1,
"restServerParameters":{
"host":"0.0.0.0",
"port":6845
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json
index a2a036645..deec966e8 100644
--- a/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json
@@ -1,5 +1,6 @@
{
"name": "DMaapSim",
+ "topicSweepSec": 300,
"restServerParameters": {
"host": "0.0.0.0",
"port": 6845
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json
index fba033e52..51e9458b0 100644
--- a/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json
@@ -1,5 +1,6 @@
{
"name":" ",
+ "topicSweepSec": 1,
"restServerParameters":{
"host":"0.0.0.0",
"port":6969,
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json
new file mode 100644
index 000000000..77a320f6d
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json
@@ -0,0 +1,36 @@
+{
+ "topicSources": [
+ {
+ "topic": "MY-TOPIC",
+ "servers": [
+ "localhost:${port}"
+ ],
+ "topicCommInfrastructure": "dmaap",
+ "fetchTimeout": 100
+ },
+ {
+ "topic": "MY-TOPIC-B",
+ "servers": [
+ "localhost:${port}"
+ ],
+ "topicCommInfrastructure": "dmaap",
+ "fetchTimeout": 100
+ }
+ ],
+ "topicSinks": [
+ {
+ "topic": "MY-TOPIC",
+ "servers": [
+ "localhost:${port}"
+ ],
+ "topicCommInfrastructure": "dmaap"
+ },
+ {
+ "topic": "MY-TOPIC-B",
+ "servers": [
+ "localhost:${port}"
+ ],
+ "topicCommInfrastructure": "dmaap"
+ }
+ ]
+} \ No newline at end of file