summaryrefslogtreecommitdiffstats
path: root/models-sim/models-sim-dmaap/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'models-sim/models-sim-dmaap/src/main')
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java32
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java11
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java5
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java6
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java190
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java215
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java201
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java170
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java76
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java100
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java63
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java66
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java25
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java40
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java13
-rw-r--r--models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json1
16 files changed, 779 insertions, 435 deletions
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java
deleted file mode 100644
index 1682eb991..000000000
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.models.sim.dmaap;
-
-/**
- * Names of various items contained in the Registry.
- */
-public class DmaapSimConstants {
-
- // Registry keys
- public static final String REG_DMAAP_SIM_ACTIVATOR = "object:activator/dmaap-sim";
-
- private DmaapSimConstants() {
- super();
- }
-}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java
index caae287b8..11da57582 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +22,8 @@
package org.onap.policy.models.sim.dmaap.parameters;
import lombok.Getter;
-
import org.onap.policy.common.parameters.ParameterGroupImpl;
+import org.onap.policy.common.parameters.annotations.Min;
import org.onap.policy.common.parameters.annotations.NotBlank;
import org.onap.policy.common.parameters.annotations.NotNull;
@@ -36,6 +37,14 @@ public class DmaapSimParameterGroup extends ParameterGroupImpl {
private RestServerParameters restServerParameters;
/**
+ * Frequency, in seconds, with which to sweep the topics of idle consumers. On each
+ * sweep cycle, if a consumer group has had no new poll requests since the last sweep
+ * cycle, it is removed.
+ */
+ @Min(1)
+ private long topicSweepSec;
+
+ /**
* Create the DMaaP simulator parameter group.
*
* @param name the parameter group name
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java
index 8eb76ec00..252054504 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,7 +39,7 @@ public class DmaapSimParameterHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimParameterHandler.class);
- private static final Coder CODER = new StandardCoder();
+ private final Coder coder = new StandardCoder();
/**
* Read the parameters from the parameter file.
@@ -54,7 +55,7 @@ public class DmaapSimParameterHandler {
try {
// Read the parameters from JSON
File file = new File(arguments.getFullConfigurationFilePath());
- dmaapSimParameterGroup = CODER.decode(file, DmaapSimParameterGroup.class);
+ dmaapSimParameterGroup = coder.decode(file, DmaapSimParameterGroup.class);
} catch (final CoderException e) {
final String errorMessage = "error reading parameters from \"" + arguments.getConfigurationFilePath()
+ "\"\n" + "(" + e.getClass().getSimpleName() + "):" + e.getMessage();
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java
index c7269f66d..8fb2cffa3 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,13 +22,14 @@
package org.onap.policy.models.sim.dmaap.parameters;
import lombok.Getter;
+
import org.onap.policy.common.parameters.ParameterGroupImpl;
import org.onap.policy.common.parameters.annotations.Min;
import org.onap.policy.common.parameters.annotations.NotBlank;
import org.onap.policy.common.parameters.annotations.NotNull;
/**
- * Class to hold all parameters needed for DMaaP simulator rest server.
+ * Class to hold all parameters needed for rest server.
*/
@NotNull
@NotBlank
@@ -39,6 +41,6 @@ public class RestServerParameters extends ParameterGroupImpl {
private int port;
public RestServerParameters() {
- super("RestServerParameters");
+ super(RestServerParameters.class.getSimpleName());
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java
new file mode 100644
index 000000000..737151339
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java
@@ -0,0 +1,190 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data associated with a consumer group. All consumer instances within a group share the
+ * same data object.
+ */
+public class ConsumerGroupData {
+ private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupData.class);
+
+ /**
+ * Returned when messages can no longer be read from this consumer group object,
+ * because it is being removed from the topic. {@link #UNREADABLE_LIST} must not be
+ * the same list as Collections.emptyList(), thus we wrap it.
+ */
+ public static final List<String> UNREADABLE_LIST = Collections.unmodifiableList(Collections.emptyList());
+
+ /**
+ * Returned when there are no messages read. Collections.emptyList() is already
+ * unmodifiable, thus no need to wrap it.
+ */
+ private static final List<String> EMPTY_LIST = Collections.emptyList();
+
+ /**
+ * This is locked while fields other than {@link #messageQueue} are updated.
+ */
+ private final Object lockit = new Object();
+
+ /**
+ * Number of sweep cycles that have occurred since a consumer has attempted to read
+ * from the queue. This consumer group should be removed once this count exceeds
+ * {@code 1}, provided {@link #nreaders} is zero.
+ */
+ private int nsweeps = 0;
+
+ /**
+ * Number of consumers that are currently attempting to read from the queue. This
+ * consumer group should not be removed as long as this is non-zero.
+ */
+ private int nreaders = 0;
+
+ /**
+ * Message queue.
+ */
+ private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param topicName name of the topic with which this object is associated
+ * @param groupName name of the consumer group with which this object is associated
+ */
+ public ConsumerGroupData(String topicName, String groupName) {
+ logger.info("Topic {}: add consumer group: {}", topicName, groupName);
+ }
+
+ /**
+ * Determines if this consumer group should be removed. This should be invoked once
+ * during each sweep cycle. When this returns {@code true}, this consumer group should
+ * be immediately discarded, as any readers will sit in a spin loop waiting for it to
+ * be discarded.
+ *
+ * @return {@code true} if this consumer group should be removed, {@code false}
+ * otherwise
+ */
+ public boolean shouldRemove() {
+ synchronized (lockit) {
+ return (nreaders == 0 && ++nsweeps > 1);
+ }
+ }
+
+ /**
+ * Reads messages from the queue, blocking if necessary.
+ *
+ * @param maxRead maximum number of messages to read
+ * @param waitMs time to wait, in milliseconds, if the queue is currently empty
+ * @return a list of messages read from the queue, empty if no messages became
+ * available before the wait time elapsed, or {@link #UNREADABLE_LIST} if this
+ * consumer group object is no longer active
+ * @throws InterruptedException if this thread was interrupted while waiting for the
+ * first message
+ */
+ public List<String> read(int maxRead, long waitMs) throws InterruptedException {
+
+ synchronized (lockit) {
+ if (nsweeps > 1 && nreaders == 0) {
+ // cannot use this consumer group object anymore
+ return UNREADABLE_LIST;
+ }
+
+ ++nreaders;
+ }
+
+ /*
+ * Note: do EVERYTHING inside of the "try" block, so that the "finally" block can
+ * update the reader count.
+ *
+ * Do NOT hold the lockit while we're polling, as poll() may block for a while.
+ */
+ try {
+ // always read at least one message
+ int maxRead2 = Math.max(1, maxRead);
+ long waitMs2 = Math.max(0, waitMs);
+
+ // perform a blocking read of the queue
+ String obj = getNextMessage(waitMs2);
+ if (obj == null) {
+ return EMPTY_LIST;
+ }
+
+ /*
+ * List should hold all messages from the queue PLUS the one we already have.
+ * Note: it's possible for additional messages to be added to the queue while
+ * we're reading from it. In that case, the list will grow as needed.
+ */
+ List<String> lst = new ArrayList<>(Math.min(maxRead2, messageQueue.size() + 1));
+ lst.add(obj);
+
+ // perform NON-blocking read of subsequent messages
+ for (int x = 1; x < maxRead2; ++x) {
+ if ((obj = messageQueue.poll()) == null) {
+ break;
+ }
+
+ lst.add(obj);
+ }
+
+ return lst;
+
+ } finally {
+ synchronized (lockit) {
+ --nreaders;
+ nsweeps = 0;
+ }
+ }
+ }
+
+ /**
+ * Writes messages to the queue.
+ *
+ * @param messages messages to be written to the queue
+ */
+ public void write(List<String> messages) {
+ messageQueue.addAll(messages);
+ }
+
+ // the following methods may be overridden by junit tests
+
+ /**
+ * Gets the next message from the queue.
+ *
+ * @param waitMs time to wait, in milliseconds, if the queue is currently empty
+ * @return the next message, or {@code null} if no messages became available before
+ * the wait time elapsed
+ * @throws InterruptedException if this thread was interrupted while waiting for the
+ * message
+ */
+ protected String getNextMessage(long waitMs) throws InterruptedException {
+ return messageQueue.poll(waitMs, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
index 9de29cdac..d11d1b397 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,18 +21,20 @@
package org.onap.policy.models.sim.dmaap.provider;
+import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+import javax.ws.rs.core.Response.Status;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,50 +43,70 @@ import org.slf4j.LoggerFactory;
*
* @author Liam Fallon (liam.fallon@est.tech)
*/
-public class DmaapSimProvider {
+public class DmaapSimProvider extends ServiceManagerContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
- // Recurring string constants
- private static final String TOPIC_TAG = "Topic:";
+ @Getter
+ @Setter
+ private static DmaapSimProvider instance;
- // Time for a get to wait before checking of a message has come
- private static final long DMAAP_SIM_WAIT_TIME = 50;
+ /**
+ * Maps a topic name to its data.
+ */
+ private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>();
- // recurring constants
- private static final String WITH_TIMEOUT = " with timeout ";
+ /**
+ * Thread used to remove idle consumers from the topics.
+ */
+ private ScheduledExecutorService timerPool;
- // The map of topic messages
- private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>();
- // The map of topic messages
- private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap =
- new LinkedHashMap<>();
+ /**
+ * Constructs the object.
+ *
+ * @param params parameters
+ */
+ public DmaapSimProvider(DmaapSimParameterGroup params) {
+ addAction("Topic Sweeper", () -> {
+ timerPool = makeTimerPool();
+ timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(),
+ TimeUnit.SECONDS);
+ }, () -> timerPool.shutdown());
+ }
/**
* Process a DMaaP message.
*
- * @param topicName The topic name
+ * @param topicName the topic name
* @param dmaapMessage the message to process
* @return a response to the message
*/
+ @SuppressWarnings("unchecked")
public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
- LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage);
-
- synchronized (topicMessageMap) {
- SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
- if (messageMap == null) {
- messageMap = new TreeMap<>();
- topicMessageMap.put(topicName, messageMap);
- LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map");
- }
+ LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage);
- int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
+ List<Object> lst;
- messageMap.put(nextKey, dmaapMessage);
- LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
+ if (dmaapMessage instanceof List) {
+ lst = (List<Object>) dmaapMessage;
+ } else {
+ lst = Collections.singletonList(dmaapMessage);
}
- return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build();
+ TopicData topic = topic2data.get(topicName);
+
+ /*
+ * Write all messages and return the count. If the topic doesn't exist yet, then
+ * there are no subscribers to receive the messages, thus treat it as if all
+ * messages were published.
+ */
+ int nmessages = (topic != null ? topic.write(lst) : lst.size());
+
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("serverTimeMs", 0);
+ map.put("count", nmessages);
+
+ return Response.status(Response.Status.OK).entity(map).build();
}
/**
@@ -92,102 +115,66 @@ public class DmaapSimProvider {
* @param topicName The topic to wait on
* @param consumerGroup the consumer group that is waiting
* @param consumerId the consumer ID that is waiting
- * @param timeout the length of time to wait for
+ * @param limit the maximum number of messages to get
+ * @param timeoutMs the length of time to wait for
* @return the DMaaP message or
*/
public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
- final int timeout) {
+ final int limit, final long timeoutMs) {
- LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
- + WITH_TIMEOUT + timeout);
+ LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup,
+ consumerId, limit, timeoutMs);
- MutablePair<Integer, String> consumerGroupPair = null;
-
- synchronized (consumerGroupsMap) {
- Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
- if (consumerGroupMap == null) {
- consumerGroupMap = new LinkedHashMap<>();
- consumerGroupsMap.put(topicName, consumerGroupMap);
- LOGGER.trace(
- TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
- }
-
- consumerGroupPair = consumerGroupMap.get(consumerGroup);
- if (consumerGroupPair == null) {
- consumerGroupPair = new MutablePair<>(-1, consumerId);
- consumerGroupMap.put(consumerGroup, consumerGroupPair);
- LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group "
- + consumerGroup + ":" + consumerId);
- }
- }
+ try {
+ List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit,
+ timeoutMs);
- long timeOfTimeout = System.currentTimeMillis() + timeout;
+ if (lst.isEmpty() && timeoutMs > 0) {
+ LOGGER.debug("Topic: {}, Timed out waiting for messages: {}: {}", topicName, consumerGroup, consumerId);
+ return Response.status(Status.REQUEST_TIMEOUT).entity(lst).build();
- do {
- Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
- if (waitingMessages != null) {
- LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
- + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
- return Response.status(Response.Status.OK).entity(waitingMessages).build();
+ } else {
+ LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, consumerGroup, lst.size(),
+ consumerId);
+ return Response.status(Status.OK).entity(lst).build();
}
- try {
- TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME);
- } catch (InterruptedException ie) {
- String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
- + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
- LOGGER.warn(errorMessage, ie);
- Thread.currentThread().interrupt();
- throw new DmaapSimRuntimeException(errorMessage, ie);
- }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup,
+ consumerId, e);
+ Thread.currentThread().interrupt();
+ return Response.status(Status.GONE).entity(Collections.emptyList()).build();
}
- while (timeOfTimeout > System.currentTimeMillis());
-
- LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
- + WITH_TIMEOUT + timeout);
- return Response.status(Response.Status.REQUEST_TIMEOUT).build();
}
/**
- * Return any messages on this topic with a message number greater than the supplied message number.
- *
- * @param topicName the topic name to check
- * @param consumerGroupPair the pair with the information on the last message retrieved
- * @return the messages or null if there are none
+ * Task to remove idle consumers from each topic.
*/
- private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) {
- String foundMessageList = "[";
-
- synchronized (topicMessageMap) {
- SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
- if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) {
- return null;
- }
+ private class SweeperTask implements Runnable {
+ @Override
+ public void run() {
+ topic2data.values().forEach(TopicData::removeIdleConsumers);
+ }
+ }
- boolean first = true;
- for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) {
- if (first) {
- first = false;
- } else {
- foundMessageList += ",";
- }
- try {
- foundMessageList += new StandardCoder().encode(dmaapMessage);
- } catch (CoderException ce) {
- String errorMessage = "Encoding error on message on DMaaP topic " + topicName;
- LOGGER.warn(errorMessage, ce);
- return null;
- }
- }
- foundMessageList += ']';
+ // the following methods may be overridden by junit tests
- LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft()
- + " to " + messageMap.lastKey());
- synchronized (consumerGroupsMap) {
- consumerGroupPair.setLeft(messageMap.lastKey());
- }
- }
+ /**
+ * Makes a new timer pool.
+ *
+ * @return a new timer pool
+ */
+ protected ScheduledExecutorService makeTimerPool() {
+ return Executors.newScheduledThreadPool(1);
+ }
- return (foundMessageList.length() < 3 ? null : foundMessageList);
+ /**
+ * Makes a new topic.
+ *
+ * @param topicName topic name
+ * @return a new topic
+ */
+ protected TopicData makeTopicData(String topicName) {
+ return new TopicData(topicName);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java
new file mode 100644
index 000000000..2737f455b
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java
@@ -0,0 +1,201 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data associated with a topic.
+ *
+ * <p/>
+ * Note: for ease of implementation, this adds a topic when a consumer polls it rather
+ * than when a publisher writes to it. This is the opposite of how the real DMaaP works.
+ * As a result, this will never return a topic-not-found message to the consumer.
+ */
+public class TopicData {
+ private static final Logger logger = LoggerFactory.getLogger(TopicData.class);
+
+ /**
+ * Name of the topic with which this data is associated.
+ */
+ private final String topicName;
+
+ /**
+ * Maps a consumer group name to its associated data.
+ */
+ private final Map<String, ConsumerGroupData> group2data = new ConcurrentHashMap<>();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param topicName name of the topic with which this object is associated
+ */
+ public TopicData(String topicName) {
+ logger.info("Topic {}: added", topicName);
+ this.topicName = topicName;
+ }
+
+ /**
+ * Removes idle consumers from the topic. This is typically called once during each
+ * sweep cycle.
+ */
+ public void removeIdleConsumers() {
+ Iterator<Entry<String, ConsumerGroupData>> iter = group2data.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<String, ConsumerGroupData> ent = iter.next();
+ if (ent.getValue().shouldRemove()) {
+ /*
+ * We want the minimum amount of time to elapse between invoking
+ * shouldRemove() and iter.remove(), thus all other statements (e.g.,
+ * logging) should be done AFTER iter.remove().
+ */
+ iter.remove();
+
+ logger.info("Topic {}: removed consumer group: {}", topicName, ent.getKey());
+ }
+ }
+ }
+
+ /**
+ * Reads from a particular consumer group's queue.
+ *
+ * @param consumerGroup name of the consumer group from which to read
+ * @param maxRead maximum number of messages to read
+ * @param waitMs time to wait, in milliseconds, if the queue is currently empty
+ * @return a list of messages read from the queue, empty if no messages became
+ * available before the wait time elapsed
+ * @throws InterruptedException if this thread was interrupted while waiting for the
+ * first message
+ */
+ public List<String> read(String consumerGroup, int maxRead, long waitMs) throws InterruptedException {
+ /*
+ * It's possible that this thread may spin several times while waiting for
+ * removeIdleConsumers() to complete its call to iter.remove(), thus we create
+ * this closure once, rather than each time through the loop.
+ */
+ Function<String, ConsumerGroupData> maker = this::makeData;
+
+ // loop until we get a readable list
+ List<String> result;
+
+ // @formatter:off
+
+ do {
+ result = group2data.computeIfAbsent(consumerGroup, maker).read(maxRead, waitMs);
+ }
+ while (result == ConsumerGroupData.UNREADABLE_LIST);
+
+ // @formatter:on
+
+ return result;
+ }
+
+ /**
+ * Writes messages to the queues of every consumer group.
+ *
+ * @param messages messages to be written to the queues
+ * @return the number of messages enqueued
+ */
+ public int write(List<Object> messages) {
+ List<String> list = convertMessagesToStrings(messages);
+
+ /*
+ * We don't care if a consumer group is deleted from the map while we're adding
+ * messages to it, as those messages will simply be ignored (and discarded by the
+ * garbage collector).
+ */
+ for (ConsumerGroupData data : group2data.values()) {
+ data.write(list);
+ }
+
+ return list.size();
+ }
+
+ /**
+ * Converts a list of message objects to a list of message strings. If a message
+ * cannot be converted for some reason, then it is not added to the result list, thus
+ * the result list may be shorted than the original input list.
+ *
+ * @param messages objects to be converted
+ * @return a list of message strings
+ */
+ protected List<String> convertMessagesToStrings(List<Object> messages) {
+ Coder coder = new StandardCoder();
+ List<String> list = new ArrayList<>(messages.size());
+
+ for (Object msg : messages) {
+ String str = convertMessageToString(msg, coder);
+ if (str != null) {
+ list.add(str);
+ }
+ }
+
+ return list;
+ }
+
+ /**
+ * Converts a message object to a message string.
+ *
+ * @param message message to be converted
+ * @param coder used to encode the message as a string
+ * @return the message string, or {@code null} if it cannot be converted
+ */
+ protected String convertMessageToString(Object message, Coder coder) {
+ if (message == null) {
+ return null;
+ }
+
+ if (message instanceof String) {
+ return message.toString();
+ }
+
+ try {
+ return coder.encode(message);
+ } catch (CoderException e) {
+ logger.warn("cannot encode {}", message, e);
+ return null;
+ }
+ }
+
+ // this may be overridden by junit tests
+
+ /**
+ * Makes data for a consumer group.
+ *
+ * @param consumerGroup name of the consumer group to make
+ * @return new consumer group data
+ */
+ protected ConsumerGroupData makeData(String consumerGroup) {
+ return new ConsumerGroupData(topicName, consumerGroup);
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java
index e269ac00b..dbd9422ef 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java
@@ -1,66 +1,178 @@
-/*
- * ============LICENSE_START======================================================= ONAP
- * ================================================================================ Copyright (C) 2019 AT&T Intellectual
- * Property. All rights reserved. ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.policy.models.sim.dmaap.rest;
import java.io.BufferedReader;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.Reader;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
import javax.ws.rs.Consumes;
-import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.Provider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.io.IOUtils;
/**
- * Provider that serializes and de-serializes JSON via gson.
+ * Provider that decodes "application/cambria" messages.
*/
@Provider
@Consumes(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA)
-@Produces(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA)
public class CambriaMessageBodyHandler implements MessageBodyReader<Object> {
- // Media type for Cambria
public static final String MEDIA_TYPE_APPLICATION_CAMBRIA = "application/cambria";
- public static final Logger logger = LoggerFactory.getLogger(CambriaMessageBodyHandler.class);
+ /**
+ * Maximum length of a message or partition.
+ */
+ private static final int MAX_LEN = 10000000;
+
+ /**
+ * Maximum digits in a length field.
+ */
+ private static final int MAX_DIGITS = 10;
@Override
public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
- return MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString());
+ return (mediaType != null && MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString()));
}
@Override
- public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
- MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
- throws IOException {
-
- String cambriaString = "";
- try (BufferedReader bufferedReader = new BufferedReader(
- new InputStreamReader(entityStream))) {
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- cambriaString += line;
+ public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException {
+
+ try (BufferedReader bufferedReader =
+ new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) {
+ List<Object> messages = new LinkedList<>();
+ String msg;
+ while ((msg = readMessage(bufferedReader)) != null) {
+ messages.add(msg);
+ }
+
+ return messages;
+ }
+ }
+
+ /**
+ * Reads a message.
+ *
+ * @param reader source from which to read
+ * @return the message that was read, or {@code null} if there are no more messages
+ * @throws IOException if an error occurs
+ */
+ private String readMessage(Reader reader) throws IOException {
+ if (!skipWhitespace(reader)) {
+ return null;
+ }
+
+ int partlen = readLength(reader);
+ if (partlen > MAX_LEN) {
+ throw new IOException("invalid partition length");
+ }
+
+ int msglen = readLength(reader);
+ if (msglen > MAX_LEN) {
+ throw new IOException("invalid message length");
+ }
+
+ // skip over the partition
+ reader.skip(partlen);
+
+ return readString(reader, msglen);
+ }
+
+ /**
+ * Skips whitespace.
+ *
+ * @param reader source from which to read
+ * @return {@code true} if there is another character after the whitespace,
+ * {@code false} if the end of the stream has been reached
+ * @throws IOException if an error occurs
+ */
+ private boolean skipWhitespace(Reader reader) throws IOException {
+ int chr;
+
+ do {
+ reader.mark(1);
+ if ((chr = reader.read()) < 0) {
+ return false;
+ }
+ }
+ while (Character.isWhitespace(chr));
+
+ // push the last character back onto the reader
+ reader.reset();
+
+ return true;
+ }
+
+ /**
+ * Reads a length field, which is a number followed by ".".
+ *
+ * @param reader source from which to read
+ * @return the length, or -1 if EOF has been reached
+ * @throws IOException if an error occurs
+ */
+ private int readLength(Reader reader) throws IOException {
+ StringBuilder bldr = new StringBuilder(MAX_DIGITS);
+
+ int chr;
+ for (int x = 0; x < MAX_DIGITS; ++x) {
+ if ((chr = reader.read()) < 0) {
+ throw new EOFException("missing '.' in 'length' field");
}
- return cambriaString.substring(cambriaString.indexOf('{'), cambriaString.length());
+ if (chr == '.') {
+ String text = bldr.toString().trim();
+ return (text.isEmpty() ? 0 : Integer.valueOf(text));
+ }
+
+ if (!Character.isDigit(chr)) {
+ throw new IOException("invalid character in 'length' field");
+ }
+
+ bldr.append((char) chr);
}
+
+ throw new IOException("too many digits in 'length' field");
+ }
+
+ /**
+ * Reads a string.
+ *
+ * @param reader source from which to read
+ * @param len length of the string (i.e., number of characters to read)
+ * @return the string that was read
+ * @throws IOException if an error occurs
+ */
+ private String readString(Reader reader, int len) throws IOException {
+ char[] buf = new char[len];
+ IOUtils.readFully(reader, buf);
+
+ return new String(buf);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java
index e3fdd4884..8339d0e64 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,24 +21,24 @@
package org.onap.policy.models.sim.dmaap.rest;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
-
import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
/**
* Class to provide REST endpoints for DMaaP simulator component statistics.
*/
@Path("/events")
+@Produces(DmaapSimRestControllerV1.MEDIA_TYPE_APPLICATION_JSON)
public class DmaapSimRestControllerV1 extends BaseRestControllerV1 {
+ public static final String MEDIA_TYPE_APPLICATION_JSON = "application/json";
/**
* Get a DMaaP message.
@@ -45,72 +46,33 @@ public class DmaapSimRestControllerV1 extends BaseRestControllerV1 {
* @param topicName topic to get message from
* @param consumerGroup consumer group that is getting the message
* @param consumerId consumer ID that is getting the message
- * @param timeout timeout for the message
+ * @param timeoutMs timeout for the message
* @return the message
*/
@GET
@Path("{topicName}/{consumerGroup}/{consumerId}")
- // @formatter:off
- @ApiOperation(
- value = "Get a DMaaP event on a topic",
- notes = "Returns an event on a DMaaP topic",
- response = Object.class,
- authorizations =
- @Authorization(value = AUTHORIZATION_TYPE)
- )
- @ApiResponses(
- value = {
- @ApiResponse(
- code = AUTHENTICATION_ERROR_CODE,
- message = AUTHENTICATION_ERROR_MESSAGE),
- @ApiResponse(
- code = AUTHORIZATION_ERROR_CODE,
- message = AUTHORIZATION_ERROR_MESSAGE),
- @ApiResponse(
- code = SERVER_ERROR_CODE,
- message = SERVER_ERROR_MESSAGE)
- }
- )
- // @formatter:on
- public Response getDmaaapMessage(@PathParam("topicName") final String topicName,
- @PathParam("consumerGroup") final String consumerGroup, @PathParam("consumerId") final String consumerId,
- @QueryParam("timeout") final int timeout) {
+ public Response getDmaapMessage(@PathParam("topicName") final String topicName,
+ @PathParam("consumerGroup") final String consumerGroup,
+ @PathParam("consumerId") final String consumerId,
+ @QueryParam("limit") @DefaultValue("1") final int limit,
+ @QueryParam("timeout") @DefaultValue("15000") final long timeoutMs) {
- return new DmaapSimProvider().processDmaapMessageGet(topicName, consumerGroup, consumerId, timeout);
+ return DmaapSimProvider.getInstance().processDmaapMessageGet(topicName, consumerGroup, consumerId, limit,
+ timeoutMs);
}
/**
* Post a DMaaP message.
*
- * @param topicName topic to get message from415
+ * @param topicName topic to get message from
* @return the response to the post
*/
@POST
@Path("{topicName}")
- // @formatter:off
- @ApiOperation(
- value = "Post a DMaaP event on a topic",
- notes = "Returns an event on a DMaaP topic",
- response = Response.class,
- authorizations =
- @Authorization(value = AUTHORIZATION_TYPE)
- )
- @ApiResponses(
- value = {
- @ApiResponse(
- code = AUTHENTICATION_ERROR_CODE,
- message = AUTHENTICATION_ERROR_MESSAGE),
- @ApiResponse(
- code = AUTHORIZATION_ERROR_CODE,
- message = AUTHORIZATION_ERROR_MESSAGE),
- @ApiResponse(
- code = SERVER_ERROR_CODE,
- message = SERVER_ERROR_MESSAGE)
- }
- )
- // @formatter:on
- public Response postDmaaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) {
+ @Consumes(value = {CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA,
+ TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN, MEDIA_TYPE_APPLICATION_JSON})
+ public Response postDmaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) {
- return new DmaapSimProvider().processDmaapMessagePut(topicName, dmaapMessage);
+ return DmaapSimProvider.getInstance().processDmaapMessagePut(topicName, dmaapMessage);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java
index 28de42c21..b05a0fe1a 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java
@@ -21,28 +21,21 @@
package org.onap.policy.models.sim.dmaap.rest;
-import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-
-import org.onap.policy.common.capabilities.Startable;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.gson.GsonMessageBodyHandler;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
import org.onap.policy.models.sim.dmaap.parameters.RestServerParameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Class to manage life cycle of DMaaP Simulator rest server.
*/
-public class DmaapSimRestServer implements Startable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimRestServer.class);
+public class DmaapSimRestServer extends ServiceManagerContainer {
- private List<HttpServletServer> servers = new ArrayList<>();
-
- private RestServerParameters restServerParameters;
+ private final List<HttpServletServer> servers;
/**
* Constructor for instantiating DmaapSimRestServer.
@@ -50,91 +43,40 @@ public class DmaapSimRestServer implements Startable {
* @param restServerParameters the rest server parameters
*/
public DmaapSimRestServer(final RestServerParameters restServerParameters) {
- this.restServerParameters = restServerParameters;
- }
+ this.servers = HttpServletServerFactoryInstance.getServerFactory()
+ .build(getServerProperties(restServerParameters));
- /**
- * {@inheritDoc}.
- */
- @Override
- public boolean start() {
- try {
- servers = HttpServletServerFactoryInstance.getServerFactory().build(getServerProperties());
- for (final HttpServletServer server : servers) {
- server.start();
- }
- } catch (final Exception exp) {
- LOGGER.error("Failed to start DMaaP simulator http server", exp);
- return false;
+ for (HttpServletServer server : this.servers) {
+ addAction("REST " + server.getName(), server::start, server::stop);
}
- return true;
}
/**
- * Creates the server properties object using restServerParameters.
+ * Creates a set of properties, suitable for building a REST server, from the
+ * parameters.
*
- * @return the properties object
+ * @param restServerParameters parameters from which to build the properties
+ * @return a set of properties representing the given parameters
*/
- private Properties getServerProperties() {
+ public static Properties getServerProperties(RestServerParameters restServerParameters) {
final Properties props = new Properties();
props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, restServerParameters.getName());
final String svcpfx =
- PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName();
+ PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName();
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, restServerParameters.getHost());
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
- Integer.toString(restServerParameters.getPort()));
+ Integer.toString(restServerParameters.getPort()));
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
- String.join(",", DmaapSimRestControllerV1.class.getName()));
+ DmaapSimRestControllerV1.class.getName());
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
- props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "true");
+ props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "false");
+
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER,
- CambriaMessageBodyHandler.class.getName() + "," + JsonMessageBodyHandler.class.getName());
+ String.join(",", CambriaMessageBodyHandler.class.getName(),
+ GsonMessageBodyHandler.class.getName(),
+ TextMessageBodyHandler.class.getName()));
return props;
}
-
- /**
- * {@inheritDoc}.
- */
- @Override
- public boolean stop() {
- for (final HttpServletServer server : servers) {
- try {
- server.stop();
- } catch (final Exception exp) {
- LOGGER.error("Failed to stop DMaaP simulator http server", exp);
- }
- }
- return true;
- }
-
- /**
- * {@inheritDoc}.
- */
- @Override
- public void shutdown() {
- stop();
- }
-
- /**
- * {@inheritDoc}.
- */
- @Override
- public boolean isAlive() {
- return !servers.isEmpty();
- }
-
- /**
- * {@inheritDoc}.
- */
- @Override
- public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("DmaapSimRestServer [servers=");
- builder.append(servers);
- builder.append("]");
- return builder.toString();
- }
-
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java
deleted file mode 100644
index a3eebda00..000000000
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * ============LICENSE_START======================================================= ONAP
- * ================================================================================ Copyright (C) 2019 AT&T Intellectual
- * Property. All rights reserved. ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.models.sim.dmaap.rest;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Provider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provider that serializes and de-serializes JSON via gson.
- */
-@Provider
-@Consumes(MediaType.APPLICATION_JSON)
-@Produces(MediaType.APPLICATION_JSON)
-public class JsonMessageBodyHandler implements MessageBodyReader<Object> {
- public static final Logger logger = LoggerFactory.getLogger(JsonMessageBodyHandler.class);
-
- @Override
- public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
- return MediaType.APPLICATION_JSON.equals(mediaType.toString());
- }
-
- @Override
- public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
- MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
- throws IOException {
-
- String jsonString = "";
- try (BufferedReader bufferedReader = new BufferedReader(
- new InputStreamReader(entityStream))) {
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- jsonString += line;
- }
-
- return jsonString;
- }
- }
-}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java
new file mode 100644
index 000000000..3c903c82b
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java
@@ -0,0 +1,66 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+
+/**
+ * Provider that decodes "text/plain" messages.
+ */
+@Provider
+@Consumes(TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN)
+public class TextMessageBodyHandler implements MessageBodyReader<Object> {
+ public static final String MEDIA_TYPE_TEXT_PLAIN = "text/plain";
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
+ return (mediaType != null && MEDIA_TYPE_TEXT_PLAIN.equals(mediaType.toString()));
+ }
+
+ @Override
+ public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException {
+
+ try (BufferedReader bufferedReader =
+ new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) {
+ List<Object> messages = new LinkedList<>();
+ String msg;
+ while ((msg = bufferedReader.readLine()) != null) {
+ messages.add(msg);
+ }
+
+ return messages;
+ }
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java
index 899c0e081..b9e0efaaf 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,19 +21,15 @@
package org.onap.policy.models.sim.dmaap.startstop;
-import org.onap.policy.common.parameters.ParameterService;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
import org.onap.policy.models.sim.dmaap.rest.DmaapSimRestServer;
/**
* This class activates the DMaaP simulator as a complete service.
*/
public class DmaapSimActivator extends ServiceManagerContainer {
- /**
- * The DMaaP simulator REST API server.
- */
- private DmaapSimRestServer restServer;
/**
* Instantiate the activator for the DMaaP simulator as a complete service.
@@ -42,19 +39,11 @@ public class DmaapSimActivator extends ServiceManagerContainer {
public DmaapSimActivator(final DmaapSimParameterGroup dmaapSimParameterGroup) {
super("DMaaP Simulator");
- // @formatter:off
- addAction("DMaaP Simulator parameters",
- () -> ParameterService.register(dmaapSimParameterGroup),
- () -> ParameterService.deregister(dmaapSimParameterGroup.getName()));
-
- addAction("Create REST server",
- () -> restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters()),
- () -> restServer = null
- );
+ DmaapSimProvider provider = new DmaapSimProvider(dmaapSimParameterGroup);
+ DmaapSimProvider.setInstance(provider);
+ addAction("Sim Provider", provider::start, provider::stop);
- addAction("REST server",
- () -> restServer.start(),
- () -> restServer.stop());
- // @formatter:on
+ DmaapSimRestServer restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters());
+ addAction("REST server", restServer::start, restServer::stop);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java
index cf559f712..724c3dc35 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java
@@ -26,13 +26,15 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URL;
import java.util.Arrays;
-
+import lombok.Getter;
+import lombok.Setter;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.utils.resources.ResourceUtils;
import org.onap.policy.models.sim.dmaap.DmaapSimException;
import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
@@ -46,6 +48,9 @@ public class DmaapSimCommandLineArguments {
private static final int HELP_LINE_LENGTH = 120;
private final Options options;
+
+ @Getter
+ @Setter
private String configurationFilePath = null;
/**
@@ -145,7 +150,7 @@ public class DmaapSimCommandLineArguments {
* @throws DmaapSimException on command argument validation errors
*/
public void validate() throws DmaapSimException {
- validateReadableFile("DMaaP simulator configuration", configurationFilePath);
+ validateFileExists("DMaaP simulator configuration", configurationFilePath);
}
/**
@@ -175,15 +180,6 @@ public class DmaapSimCommandLineArguments {
}
/**
- * Gets the configuration file path.
- *
- * @return the configuration file path
- */
- public String getConfigurationFilePath() {
- return configurationFilePath;
- }
-
- /**
* Gets the full expanded configuration file path.
*
* @return the configuration file path
@@ -193,16 +189,6 @@ public class DmaapSimCommandLineArguments {
}
/**
- * Sets the configuration file path.
- *
- * @param configurationFilePath the configuration file path
- */
- public void setConfigurationFilePath(final String configurationFilePath) {
- this.configurationFilePath = configurationFilePath;
-
- }
-
- /**
* Check set configuration file path.
*
* @return true, if check set configuration file path
@@ -212,14 +198,14 @@ public class DmaapSimCommandLineArguments {
}
/**
- * Validate readable file.
+ * Validate file exists.
*
* @param fileTag the file tag
* @param fileName the file name
* @throws DmaapSimException on the file name passed as a parameter
*/
- private void validateReadableFile(final String fileTag, final String fileName) throws DmaapSimException {
- if (fileName == null || fileName.length() == 0) {
+ private void validateFileExists(final String fileTag, final String fileName) throws DmaapSimException {
+ if (StringUtils.isBlank(fileName)) {
throw new DmaapSimException(fileTag + " file was not specified as an argument");
}
@@ -233,11 +219,5 @@ public class DmaapSimCommandLineArguments {
if (!theFile.exists()) {
throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" does not exist");
}
- if (!theFile.isFile()) {
- throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is not a normal file");
- }
- if (!theFile.canRead()) {
- throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is ureadable");
- }
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java
index 878d008a8..7b4f41b07 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,9 +22,6 @@
package org.onap.policy.models.sim.dmaap.startstop;
import java.util.Arrays;
-
-import org.onap.policy.common.utils.services.Registry;
-import org.onap.policy.models.sim.dmaap.DmaapSimConstants;
import org.onap.policy.models.sim.dmaap.DmaapSimException;
import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler;
@@ -75,14 +73,12 @@ public class Main {
// Now, create the activator for the DMaaP Simulator service
activator = new DmaapSimActivator(parameterGroup);
- Registry.register(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR, activator);
// Start the activator
try {
activator.start();
} catch (final RuntimeException e) {
LOGGER.error("start of DMaaP simulator service failed, used parameters are {}", Arrays.toString(args), e);
- Registry.unregister(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR);
return;
}
@@ -110,7 +106,7 @@ public class Main {
parameterGroup = null;
// clear the DMaaP simulator activator
- if (activator != null) {
+ if (activator != null && activator.isAlive()) {
activator.stop();
}
}
@@ -128,8 +124,9 @@ public class Main {
public void run() {
try {
// Shutdown the DMaaP simulator service and wait for everything to stop
- activator.stop();
- } catch (final RuntimeException e) {
+ shutdown();
+
+ } catch (final RuntimeException | DmaapSimException e) {
LOGGER.warn("error occured during shut down of the DMaaP simulator service", e);
}
}
diff --git a/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json b/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json
index dd2477a24..e936eb034 100644
--- a/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json
+++ b/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json
@@ -1,5 +1,6 @@
{
"name": "DMaapSim",
+ "topicSweepSec": 900,
"restServerParameters": {
"host": "0.0.0.0",
"port": 3904