diff options
Diffstat (limited to 'models-sim/models-sim-dmaap/src/main')
16 files changed, 779 insertions, 435 deletions
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java deleted file mode 100644 index 1682eb991..000000000 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.models.sim.dmaap; - -/** - * Names of various items contained in the Registry. - */ -public class DmaapSimConstants { - - // Registry keys - public static final String REG_DMAAP_SIM_ACTIVATOR = "object:activator/dmaap-sim"; - - private DmaapSimConstants() { - super(); - } -} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java index caae287b8..11da57582 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +22,8 @@ package org.onap.policy.models.sim.dmaap.parameters; import lombok.Getter; - import org.onap.policy.common.parameters.ParameterGroupImpl; +import org.onap.policy.common.parameters.annotations.Min; import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; @@ -36,6 +37,14 @@ public class DmaapSimParameterGroup extends ParameterGroupImpl { private RestServerParameters restServerParameters; /** + * Frequency, in seconds, with which to sweep the topics of idle consumers. On each + * sweep cycle, if a consumer group has had no new poll requests since the last sweep + * cycle, it is removed. + */ + @Min(1) + private long topicSweepSec; + + /** * Create the DMaaP simulator parameter group. * * @param name the parameter group name diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java index 8eb76ec00..252054504 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +39,7 @@ public class DmaapSimParameterHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimParameterHandler.class); - private static final Coder CODER = new StandardCoder(); + private final Coder coder = new StandardCoder(); /** * Read the parameters from the parameter file. @@ -54,7 +55,7 @@ public class DmaapSimParameterHandler { try { // Read the parameters from JSON File file = new File(arguments.getFullConfigurationFilePath()); - dmaapSimParameterGroup = CODER.decode(file, DmaapSimParameterGroup.class); + dmaapSimParameterGroup = coder.decode(file, DmaapSimParameterGroup.class); } catch (final CoderException e) { final String errorMessage = "error reading parameters from \"" + arguments.getConfigurationFilePath() + "\"\n" + "(" + e.getClass().getSimpleName() + "):" + e.getMessage(); diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java index c7269f66d..8fb2cffa3 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +22,14 @@ package org.onap.policy.models.sim.dmaap.parameters; import lombok.Getter; + import org.onap.policy.common.parameters.ParameterGroupImpl; import org.onap.policy.common.parameters.annotations.Min; import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; /** - * Class to hold all parameters needed for DMaaP simulator rest server. + * Class to hold all parameters needed for rest server. */ @NotNull @NotBlank @@ -39,6 +41,6 @@ public class RestServerParameters extends ParameterGroupImpl { private int port; public RestServerParameters() { - super("RestServerParameters"); + super(RestServerParameters.class.getSimpleName()); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java new file mode 100644 index 000000000..737151339 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java @@ -0,0 +1,190 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Data associated with a consumer group. All consumer instances within a group share the + * same data object. + */ +public class ConsumerGroupData { + private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupData.class); + + /** + * Returned when messages can no longer be read from this consumer group object, + * because it is being removed from the topic. {@link #UNREADABLE_LIST} must not be + * the same list as Collections.emptyList(), thus we wrap it. + */ + public static final List<String> UNREADABLE_LIST = Collections.unmodifiableList(Collections.emptyList()); + + /** + * Returned when there are no messages read. Collections.emptyList() is already + * unmodifiable, thus no need to wrap it. + */ + private static final List<String> EMPTY_LIST = Collections.emptyList(); + + /** + * This is locked while fields other than {@link #messageQueue} are updated. + */ + private final Object lockit = new Object(); + + /** + * Number of sweep cycles that have occurred since a consumer has attempted to read + * from the queue. This consumer group should be removed once this count exceeds + * {@code 1}, provided {@link #nreaders} is zero. + */ + private int nsweeps = 0; + + /** + * Number of consumers that are currently attempting to read from the queue. This + * consumer group should not be removed as long as this is non-zero. + */ + private int nreaders = 0; + + /** + * Message queue. + */ + private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(); + + + /** + * Constructs the object. + * + * @param topicName name of the topic with which this object is associated + * @param groupName name of the consumer group with which this object is associated + */ + public ConsumerGroupData(String topicName, String groupName) { + logger.info("Topic {}: add consumer group: {}", topicName, groupName); + } + + /** + * Determines if this consumer group should be removed. This should be invoked once + * during each sweep cycle. When this returns {@code true}, this consumer group should + * be immediately discarded, as any readers will sit in a spin loop waiting for it to + * be discarded. + * + * @return {@code true} if this consumer group should be removed, {@code false} + * otherwise + */ + public boolean shouldRemove() { + synchronized (lockit) { + return (nreaders == 0 && ++nsweeps > 1); + } + } + + /** + * Reads messages from the queue, blocking if necessary. + * + * @param maxRead maximum number of messages to read + * @param waitMs time to wait, in milliseconds, if the queue is currently empty + * @return a list of messages read from the queue, empty if no messages became + * available before the wait time elapsed, or {@link #UNREADABLE_LIST} if this + * consumer group object is no longer active + * @throws InterruptedException if this thread was interrupted while waiting for the + * first message + */ + public List<String> read(int maxRead, long waitMs) throws InterruptedException { + + synchronized (lockit) { + if (nsweeps > 1 && nreaders == 0) { + // cannot use this consumer group object anymore + return UNREADABLE_LIST; + } + + ++nreaders; + } + + /* + * Note: do EVERYTHING inside of the "try" block, so that the "finally" block can + * update the reader count. + * + * Do NOT hold the lockit while we're polling, as poll() may block for a while. + */ + try { + // always read at least one message + int maxRead2 = Math.max(1, maxRead); + long waitMs2 = Math.max(0, waitMs); + + // perform a blocking read of the queue + String obj = getNextMessage(waitMs2); + if (obj == null) { + return EMPTY_LIST; + } + + /* + * List should hold all messages from the queue PLUS the one we already have. + * Note: it's possible for additional messages to be added to the queue while + * we're reading from it. In that case, the list will grow as needed. + */ + List<String> lst = new ArrayList<>(Math.min(maxRead2, messageQueue.size() + 1)); + lst.add(obj); + + // perform NON-blocking read of subsequent messages + for (int x = 1; x < maxRead2; ++x) { + if ((obj = messageQueue.poll()) == null) { + break; + } + + lst.add(obj); + } + + return lst; + + } finally { + synchronized (lockit) { + --nreaders; + nsweeps = 0; + } + } + } + + /** + * Writes messages to the queue. + * + * @param messages messages to be written to the queue + */ + public void write(List<String> messages) { + messageQueue.addAll(messages); + } + + // the following methods may be overridden by junit tests + + /** + * Gets the next message from the queue. + * + * @param waitMs time to wait, in milliseconds, if the queue is currently empty + * @return the next message, or {@code null} if no messages became available before + * the wait time elapsed + * @throws InterruptedException if this thread was interrupted while waiting for the + * message + */ + protected String getNextMessage(long waitMs) throws InterruptedException { + return messageQueue.poll(waitMs, TimeUnit.MILLISECONDS); + } +} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java index 9de29cdac..d11d1b397 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,18 +21,20 @@ package org.onap.policy.models.sim.dmaap.provider; +import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.tuple.MutablePair; -import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; -import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; +import javax.ws.rs.core.Response.Status; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,50 +43,70 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@est.tech) */ -public class DmaapSimProvider { +public class DmaapSimProvider extends ServiceManagerContainer { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class); - // Recurring string constants - private static final String TOPIC_TAG = "Topic:"; + @Getter + @Setter + private static DmaapSimProvider instance; - // Time for a get to wait before checking of a message has come - private static final long DMAAP_SIM_WAIT_TIME = 50; + /** + * Maps a topic name to its data. + */ + private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>(); - // recurring constants - private static final String WITH_TIMEOUT = " with timeout "; + /** + * Thread used to remove idle consumers from the topics. + */ + private ScheduledExecutorService timerPool; - // The map of topic messages - private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>(); - // The map of topic messages - private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap = - new LinkedHashMap<>(); + /** + * Constructs the object. + * + * @param params parameters + */ + public DmaapSimProvider(DmaapSimParameterGroup params) { + addAction("Topic Sweeper", () -> { + timerPool = makeTimerPool(); + timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(), + TimeUnit.SECONDS); + }, () -> timerPool.shutdown()); + } /** * Process a DMaaP message. * - * @param topicName The topic name + * @param topicName the topic name * @param dmaapMessage the message to process * @return a response to the message */ + @SuppressWarnings("unchecked") public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) { - LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage); - - synchronized (topicMessageMap) { - SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName); - if (messageMap == null) { - messageMap = new TreeMap<>(); - topicMessageMap.put(topicName, messageMap); - LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map"); - } + LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage); - int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1); + List<Object> lst; - messageMap.put(nextKey, dmaapMessage); - LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage); + if (dmaapMessage instanceof List) { + lst = (List<Object>) dmaapMessage; + } else { + lst = Collections.singletonList(dmaapMessage); } - return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build(); + TopicData topic = topic2data.get(topicName); + + /* + * Write all messages and return the count. If the topic doesn't exist yet, then + * there are no subscribers to receive the messages, thus treat it as if all + * messages were published. + */ + int nmessages = (topic != null ? topic.write(lst) : lst.size()); + + Map<String, Object> map = new LinkedHashMap<>(); + map.put("serverTimeMs", 0); + map.put("count", nmessages); + + return Response.status(Response.Status.OK).entity(map).build(); } /** @@ -92,102 +115,66 @@ public class DmaapSimProvider { * @param topicName The topic to wait on * @param consumerGroup the consumer group that is waiting * @param consumerId the consumer ID that is waiting - * @param timeout the length of time to wait for + * @param limit the maximum number of messages to get + * @param timeoutMs the length of time to wait for * @return the DMaaP message or */ public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId, - final int timeout) { + final int limit, final long timeoutMs) { - LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout); + LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup, + consumerId, limit, timeoutMs); - MutablePair<Integer, String> consumerGroupPair = null; - - synchronized (consumerGroupsMap) { - Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName); - if (consumerGroupMap == null) { - consumerGroupMap = new LinkedHashMap<>(); - consumerGroupsMap.put(topicName, consumerGroupMap); - LOGGER.trace( - TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup); - } - - consumerGroupPair = consumerGroupMap.get(consumerGroup); - if (consumerGroupPair == null) { - consumerGroupPair = new MutablePair<>(-1, consumerId); - consumerGroupMap.put(consumerGroup, consumerGroupPair); - LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group " - + consumerGroup + ":" + consumerId); - } - } + try { + List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit, + timeoutMs); - long timeOfTimeout = System.currentTimeMillis() + timeout; + if (lst.isEmpty() && timeoutMs > 0) { + LOGGER.debug("Topic: {}, Timed out waiting for messages: {}: {}", topicName, consumerGroup, consumerId); + return Response.status(Status.REQUEST_TIMEOUT).entity(lst).build(); - do { - Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair); - if (waitingMessages != null) { - LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages); - return Response.status(Response.Status.OK).entity(waitingMessages).build(); + } else { + LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, consumerGroup, lst.size(), + consumerId); + return Response.status(Status.OK).entity(lst).build(); } - try { - TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME); - } catch (InterruptedException ie) { - String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID " - + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout; - LOGGER.warn(errorMessage, ie); - Thread.currentThread().interrupt(); - throw new DmaapSimRuntimeException(errorMessage, ie); - } + } catch (InterruptedException e) { + LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup, + consumerId, e); + Thread.currentThread().interrupt(); + return Response.status(Status.GONE).entity(Collections.emptyList()).build(); } - while (timeOfTimeout > System.currentTimeMillis()); - - LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout); - return Response.status(Response.Status.REQUEST_TIMEOUT).build(); } /** - * Return any messages on this topic with a message number greater than the supplied message number. - * - * @param topicName the topic name to check - * @param consumerGroupPair the pair with the information on the last message retrieved - * @return the messages or null if there are none + * Task to remove idle consumers from each topic. */ - private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) { - String foundMessageList = "["; - - synchronized (topicMessageMap) { - SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName); - if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) { - return null; - } + private class SweeperTask implements Runnable { + @Override + public void run() { + topic2data.values().forEach(TopicData::removeIdleConsumers); + } + } - boolean first = true; - for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) { - if (first) { - first = false; - } else { - foundMessageList += ","; - } - try { - foundMessageList += new StandardCoder().encode(dmaapMessage); - } catch (CoderException ce) { - String errorMessage = "Encoding error on message on DMaaP topic " + topicName; - LOGGER.warn(errorMessage, ce); - return null; - } - } - foundMessageList += ']'; + // the following methods may be overridden by junit tests - LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft() - + " to " + messageMap.lastKey()); - synchronized (consumerGroupsMap) { - consumerGroupPair.setLeft(messageMap.lastKey()); - } - } + /** + * Makes a new timer pool. + * + * @return a new timer pool + */ + protected ScheduledExecutorService makeTimerPool() { + return Executors.newScheduledThreadPool(1); + } - return (foundMessageList.length() < 3 ? null : foundMessageList); + /** + * Makes a new topic. + * + * @param topicName topic name + * @return a new topic + */ + protected TopicData makeTopicData(String topicName) { + return new TopicData(topicName); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java new file mode 100644 index 000000000..2737f455b --- /dev/null +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java @@ -0,0 +1,201 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Data associated with a topic. + * + * <p/> + * Note: for ease of implementation, this adds a topic when a consumer polls it rather + * than when a publisher writes to it. This is the opposite of how the real DMaaP works. + * As a result, this will never return a topic-not-found message to the consumer. + */ +public class TopicData { + private static final Logger logger = LoggerFactory.getLogger(TopicData.class); + + /** + * Name of the topic with which this data is associated. + */ + private final String topicName; + + /** + * Maps a consumer group name to its associated data. + */ + private final Map<String, ConsumerGroupData> group2data = new ConcurrentHashMap<>(); + + + /** + * Constructs the object. + * + * @param topicName name of the topic with which this object is associated + */ + public TopicData(String topicName) { + logger.info("Topic {}: added", topicName); + this.topicName = topicName; + } + + /** + * Removes idle consumers from the topic. This is typically called once during each + * sweep cycle. + */ + public void removeIdleConsumers() { + Iterator<Entry<String, ConsumerGroupData>> iter = group2data.entrySet().iterator(); + while (iter.hasNext()) { + Entry<String, ConsumerGroupData> ent = iter.next(); + if (ent.getValue().shouldRemove()) { + /* + * We want the minimum amount of time to elapse between invoking + * shouldRemove() and iter.remove(), thus all other statements (e.g., + * logging) should be done AFTER iter.remove(). + */ + iter.remove(); + + logger.info("Topic {}: removed consumer group: {}", topicName, ent.getKey()); + } + } + } + + /** + * Reads from a particular consumer group's queue. + * + * @param consumerGroup name of the consumer group from which to read + * @param maxRead maximum number of messages to read + * @param waitMs time to wait, in milliseconds, if the queue is currently empty + * @return a list of messages read from the queue, empty if no messages became + * available before the wait time elapsed + * @throws InterruptedException if this thread was interrupted while waiting for the + * first message + */ + public List<String> read(String consumerGroup, int maxRead, long waitMs) throws InterruptedException { + /* + * It's possible that this thread may spin several times while waiting for + * removeIdleConsumers() to complete its call to iter.remove(), thus we create + * this closure once, rather than each time through the loop. + */ + Function<String, ConsumerGroupData> maker = this::makeData; + + // loop until we get a readable list + List<String> result; + + // @formatter:off + + do { + result = group2data.computeIfAbsent(consumerGroup, maker).read(maxRead, waitMs); + } + while (result == ConsumerGroupData.UNREADABLE_LIST); + + // @formatter:on + + return result; + } + + /** + * Writes messages to the queues of every consumer group. + * + * @param messages messages to be written to the queues + * @return the number of messages enqueued + */ + public int write(List<Object> messages) { + List<String> list = convertMessagesToStrings(messages); + + /* + * We don't care if a consumer group is deleted from the map while we're adding + * messages to it, as those messages will simply be ignored (and discarded by the + * garbage collector). + */ + for (ConsumerGroupData data : group2data.values()) { + data.write(list); + } + + return list.size(); + } + + /** + * Converts a list of message objects to a list of message strings. If a message + * cannot be converted for some reason, then it is not added to the result list, thus + * the result list may be shorted than the original input list. + * + * @param messages objects to be converted + * @return a list of message strings + */ + protected List<String> convertMessagesToStrings(List<Object> messages) { + Coder coder = new StandardCoder(); + List<String> list = new ArrayList<>(messages.size()); + + for (Object msg : messages) { + String str = convertMessageToString(msg, coder); + if (str != null) { + list.add(str); + } + } + + return list; + } + + /** + * Converts a message object to a message string. + * + * @param message message to be converted + * @param coder used to encode the message as a string + * @return the message string, or {@code null} if it cannot be converted + */ + protected String convertMessageToString(Object message, Coder coder) { + if (message == null) { + return null; + } + + if (message instanceof String) { + return message.toString(); + } + + try { + return coder.encode(message); + } catch (CoderException e) { + logger.warn("cannot encode {}", message, e); + return null; + } + } + + // this may be overridden by junit tests + + /** + * Makes data for a consumer group. + * + * @param consumerGroup name of the consumer group to make + * @return new consumer group data + */ + protected ConsumerGroupData makeData(String consumerGroup) { + return new ConsumerGroupData(topicName, consumerGroup); + } +} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java index e269ac00b..dbd9422ef 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java @@ -1,66 +1,178 @@ -/* - * ============LICENSE_START======================================================= ONAP - * ================================================================================ Copyright (C) 2019 AT&T Intellectual - * Property. All rights reserved. ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.policy.models.sim.dmaap.rest; import java.io.BufferedReader; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.Reader; import java.lang.annotation.Annotation; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; import javax.ws.rs.Consumes; -import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.ext.MessageBodyReader; import javax.ws.rs.ext.Provider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; /** - * Provider that serializes and de-serializes JSON via gson. + * Provider that decodes "application/cambria" messages. */ @Provider @Consumes(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA) -@Produces(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA) public class CambriaMessageBodyHandler implements MessageBodyReader<Object> { - // Media type for Cambria public static final String MEDIA_TYPE_APPLICATION_CAMBRIA = "application/cambria"; - public static final Logger logger = LoggerFactory.getLogger(CambriaMessageBodyHandler.class); + /** + * Maximum length of a message or partition. + */ + private static final int MAX_LEN = 10000000; + + /** + * Maximum digits in a length field. + */ + private static final int MAX_DIGITS = 10; @Override public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { - return MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString()); + return (mediaType != null && MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString())); } @Override - public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType, - MultivaluedMap<String, String> httpHeaders, InputStream entityStream) - throws IOException { - - String cambriaString = ""; - try (BufferedReader bufferedReader = new BufferedReader( - new InputStreamReader(entityStream))) { - String line; - while ((line = bufferedReader.readLine()) != null) { - cambriaString += line; + public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException { + + try (BufferedReader bufferedReader = + new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) { + List<Object> messages = new LinkedList<>(); + String msg; + while ((msg = readMessage(bufferedReader)) != null) { + messages.add(msg); + } + + return messages; + } + } + + /** + * Reads a message. + * + * @param reader source from which to read + * @return the message that was read, or {@code null} if there are no more messages + * @throws IOException if an error occurs + */ + private String readMessage(Reader reader) throws IOException { + if (!skipWhitespace(reader)) { + return null; + } + + int partlen = readLength(reader); + if (partlen > MAX_LEN) { + throw new IOException("invalid partition length"); + } + + int msglen = readLength(reader); + if (msglen > MAX_LEN) { + throw new IOException("invalid message length"); + } + + // skip over the partition + reader.skip(partlen); + + return readString(reader, msglen); + } + + /** + * Skips whitespace. + * + * @param reader source from which to read + * @return {@code true} if there is another character after the whitespace, + * {@code false} if the end of the stream has been reached + * @throws IOException if an error occurs + */ + private boolean skipWhitespace(Reader reader) throws IOException { + int chr; + + do { + reader.mark(1); + if ((chr = reader.read()) < 0) { + return false; + } + } + while (Character.isWhitespace(chr)); + + // push the last character back onto the reader + reader.reset(); + + return true; + } + + /** + * Reads a length field, which is a number followed by ".". + * + * @param reader source from which to read + * @return the length, or -1 if EOF has been reached + * @throws IOException if an error occurs + */ + private int readLength(Reader reader) throws IOException { + StringBuilder bldr = new StringBuilder(MAX_DIGITS); + + int chr; + for (int x = 0; x < MAX_DIGITS; ++x) { + if ((chr = reader.read()) < 0) { + throw new EOFException("missing '.' in 'length' field"); } - return cambriaString.substring(cambriaString.indexOf('{'), cambriaString.length()); + if (chr == '.') { + String text = bldr.toString().trim(); + return (text.isEmpty() ? 0 : Integer.valueOf(text)); + } + + if (!Character.isDigit(chr)) { + throw new IOException("invalid character in 'length' field"); + } + + bldr.append((char) chr); } + + throw new IOException("too many digits in 'length' field"); + } + + /** + * Reads a string. + * + * @param reader source from which to read + * @param len length of the string (i.e., number of characters to read) + * @return the string that was read + * @throws IOException if an error occurs + */ + private String readString(Reader reader, int len) throws IOException { + char[] buf = new char[len]; + IOUtils.readFully(reader, buf); + + return new String(buf); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java index e3fdd4884..8339d0e64 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,24 +21,24 @@ package org.onap.policy.models.sim.dmaap.rest; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import io.swagger.annotations.Authorization; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; - import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider; /** * Class to provide REST endpoints for DMaaP simulator component statistics. */ @Path("/events") +@Produces(DmaapSimRestControllerV1.MEDIA_TYPE_APPLICATION_JSON) public class DmaapSimRestControllerV1 extends BaseRestControllerV1 { + public static final String MEDIA_TYPE_APPLICATION_JSON = "application/json"; /** * Get a DMaaP message. @@ -45,72 +46,33 @@ public class DmaapSimRestControllerV1 extends BaseRestControllerV1 { * @param topicName topic to get message from * @param consumerGroup consumer group that is getting the message * @param consumerId consumer ID that is getting the message - * @param timeout timeout for the message + * @param timeoutMs timeout for the message * @return the message */ @GET @Path("{topicName}/{consumerGroup}/{consumerId}") - // @formatter:off - @ApiOperation( - value = "Get a DMaaP event on a topic", - notes = "Returns an event on a DMaaP topic", - response = Object.class, - authorizations = - @Authorization(value = AUTHORIZATION_TYPE) - ) - @ApiResponses( - value = { - @ApiResponse( - code = AUTHENTICATION_ERROR_CODE, - message = AUTHENTICATION_ERROR_MESSAGE), - @ApiResponse( - code = AUTHORIZATION_ERROR_CODE, - message = AUTHORIZATION_ERROR_MESSAGE), - @ApiResponse( - code = SERVER_ERROR_CODE, - message = SERVER_ERROR_MESSAGE) - } - ) - // @formatter:on - public Response getDmaaapMessage(@PathParam("topicName") final String topicName, - @PathParam("consumerGroup") final String consumerGroup, @PathParam("consumerId") final String consumerId, - @QueryParam("timeout") final int timeout) { + public Response getDmaapMessage(@PathParam("topicName") final String topicName, + @PathParam("consumerGroup") final String consumerGroup, + @PathParam("consumerId") final String consumerId, + @QueryParam("limit") @DefaultValue("1") final int limit, + @QueryParam("timeout") @DefaultValue("15000") final long timeoutMs) { - return new DmaapSimProvider().processDmaapMessageGet(topicName, consumerGroup, consumerId, timeout); + return DmaapSimProvider.getInstance().processDmaapMessageGet(topicName, consumerGroup, consumerId, limit, + timeoutMs); } /** * Post a DMaaP message. * - * @param topicName topic to get message from415 + * @param topicName topic to get message from * @return the response to the post */ @POST @Path("{topicName}") - // @formatter:off - @ApiOperation( - value = "Post a DMaaP event on a topic", - notes = "Returns an event on a DMaaP topic", - response = Response.class, - authorizations = - @Authorization(value = AUTHORIZATION_TYPE) - ) - @ApiResponses( - value = { - @ApiResponse( - code = AUTHENTICATION_ERROR_CODE, - message = AUTHENTICATION_ERROR_MESSAGE), - @ApiResponse( - code = AUTHORIZATION_ERROR_CODE, - message = AUTHORIZATION_ERROR_MESSAGE), - @ApiResponse( - code = SERVER_ERROR_CODE, - message = SERVER_ERROR_MESSAGE) - } - ) - // @formatter:on - public Response postDmaaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) { + @Consumes(value = {CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA, + TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN, MEDIA_TYPE_APPLICATION_JSON}) + public Response postDmaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) { - return new DmaapSimProvider().processDmaapMessagePut(topicName, dmaapMessage); + return DmaapSimProvider.getInstance().processDmaapMessagePut(topicName, dmaapMessage); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java index 28de42c21..b05a0fe1a 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java @@ -21,28 +21,21 @@ package org.onap.policy.models.sim.dmaap.rest; -import java.util.ArrayList; import java.util.List; import java.util.Properties; - -import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.http.server.HttpServletServer; import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.gson.GsonMessageBodyHandler; +import org.onap.policy.common.utils.services.ServiceManagerContainer; import org.onap.policy.models.sim.dmaap.parameters.RestServerParameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Class to manage life cycle of DMaaP Simulator rest server. */ -public class DmaapSimRestServer implements Startable { - - private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimRestServer.class); +public class DmaapSimRestServer extends ServiceManagerContainer { - private List<HttpServletServer> servers = new ArrayList<>(); - - private RestServerParameters restServerParameters; + private final List<HttpServletServer> servers; /** * Constructor for instantiating DmaapSimRestServer. @@ -50,91 +43,40 @@ public class DmaapSimRestServer implements Startable { * @param restServerParameters the rest server parameters */ public DmaapSimRestServer(final RestServerParameters restServerParameters) { - this.restServerParameters = restServerParameters; - } + this.servers = HttpServletServerFactoryInstance.getServerFactory() + .build(getServerProperties(restServerParameters)); - /** - * {@inheritDoc}. - */ - @Override - public boolean start() { - try { - servers = HttpServletServerFactoryInstance.getServerFactory().build(getServerProperties()); - for (final HttpServletServer server : servers) { - server.start(); - } - } catch (final Exception exp) { - LOGGER.error("Failed to start DMaaP simulator http server", exp); - return false; + for (HttpServletServer server : this.servers) { + addAction("REST " + server.getName(), server::start, server::stop); } - return true; } /** - * Creates the server properties object using restServerParameters. + * Creates a set of properties, suitable for building a REST server, from the + * parameters. * - * @return the properties object + * @param restServerParameters parameters from which to build the properties + * @return a set of properties representing the given parameters */ - private Properties getServerProperties() { + public static Properties getServerProperties(RestServerParameters restServerParameters) { final Properties props = new Properties(); props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, restServerParameters.getName()); final String svcpfx = - PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName(); + PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName(); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, restServerParameters.getHost()); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, - Integer.toString(restServerParameters.getPort())); + Integer.toString(restServerParameters.getPort())); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, - String.join(",", DmaapSimRestControllerV1.class.getName())); + DmaapSimRestControllerV1.class.getName()); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); - props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "true"); + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "false"); + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, - CambriaMessageBodyHandler.class.getName() + "," + JsonMessageBodyHandler.class.getName()); + String.join(",", CambriaMessageBodyHandler.class.getName(), + GsonMessageBodyHandler.class.getName(), + TextMessageBodyHandler.class.getName())); return props; } - - /** - * {@inheritDoc}. - */ - @Override - public boolean stop() { - for (final HttpServletServer server : servers) { - try { - server.stop(); - } catch (final Exception exp) { - LOGGER.error("Failed to stop DMaaP simulator http server", exp); - } - } - return true; - } - - /** - * {@inheritDoc}. - */ - @Override - public void shutdown() { - stop(); - } - - /** - * {@inheritDoc}. - */ - @Override - public boolean isAlive() { - return !servers.isEmpty(); - } - - /** - * {@inheritDoc}. - */ - @Override - public String toString() { - final StringBuilder builder = new StringBuilder(); - builder.append("DmaapSimRestServer [servers="); - builder.append(servers); - builder.append("]"); - return builder.toString(); - } - } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java deleted file mode 100644 index a3eebda00..000000000 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * ============LICENSE_START======================================================= ONAP - * ================================================================================ Copyright (C) 2019 AT&T Intellectual - * Property. All rights reserved. ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.models.sim.dmaap.rest; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.lang.annotation.Annotation; -import java.lang.reflect.Type; -import javax.ws.rs.Consumes; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyReader; -import javax.ws.rs.ext.Provider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provider that serializes and de-serializes JSON via gson. - */ -@Provider -@Consumes(MediaType.APPLICATION_JSON) -@Produces(MediaType.APPLICATION_JSON) -public class JsonMessageBodyHandler implements MessageBodyReader<Object> { - public static final Logger logger = LoggerFactory.getLogger(JsonMessageBodyHandler.class); - - @Override - public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { - return MediaType.APPLICATION_JSON.equals(mediaType.toString()); - } - - @Override - public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType, - MultivaluedMap<String, String> httpHeaders, InputStream entityStream) - throws IOException { - - String jsonString = ""; - try (BufferedReader bufferedReader = new BufferedReader( - new InputStreamReader(entityStream))) { - String line; - while ((line = bufferedReader.readLine()) != null) { - jsonString += line; - } - - return jsonString; - } - } -} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java new file mode 100644 index 000000000..3c903c82b --- /dev/null +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java @@ -0,0 +1,66 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import javax.ws.rs.Consumes; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.Provider; + +/** + * Provider that decodes "text/plain" messages. + */ +@Provider +@Consumes(TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN) +public class TextMessageBodyHandler implements MessageBodyReader<Object> { + public static final String MEDIA_TYPE_TEXT_PLAIN = "text/plain"; + + @Override + public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { + return (mediaType != null && MEDIA_TYPE_TEXT_PLAIN.equals(mediaType.toString())); + } + + @Override + public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException { + + try (BufferedReader bufferedReader = + new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) { + List<Object> messages = new LinkedList<>(); + String msg; + while ((msg = bufferedReader.readLine()) != null) { + messages.add(msg); + } + + return messages; + } + } +} diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java index 899c0e081..b9e0efaaf 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,19 +21,15 @@ package org.onap.policy.models.sim.dmaap.startstop; -import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.common.utils.services.ServiceManagerContainer; import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; +import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider; import org.onap.policy.models.sim.dmaap.rest.DmaapSimRestServer; /** * This class activates the DMaaP simulator as a complete service. */ public class DmaapSimActivator extends ServiceManagerContainer { - /** - * The DMaaP simulator REST API server. - */ - private DmaapSimRestServer restServer; /** * Instantiate the activator for the DMaaP simulator as a complete service. @@ -42,19 +39,11 @@ public class DmaapSimActivator extends ServiceManagerContainer { public DmaapSimActivator(final DmaapSimParameterGroup dmaapSimParameterGroup) { super("DMaaP Simulator"); - // @formatter:off - addAction("DMaaP Simulator parameters", - () -> ParameterService.register(dmaapSimParameterGroup), - () -> ParameterService.deregister(dmaapSimParameterGroup.getName())); - - addAction("Create REST server", - () -> restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters()), - () -> restServer = null - ); + DmaapSimProvider provider = new DmaapSimProvider(dmaapSimParameterGroup); + DmaapSimProvider.setInstance(provider); + addAction("Sim Provider", provider::start, provider::stop); - addAction("REST server", - () -> restServer.start(), - () -> restServer.stop()); - // @formatter:on + DmaapSimRestServer restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters()); + addAction("REST server", restServer::start, restServer::stop); } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java index cf559f712..724c3dc35 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java @@ -26,13 +26,15 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.net.URL; import java.util.Arrays; - +import lombok.Getter; +import lombok.Setter; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.utils.resources.ResourceUtils; import org.onap.policy.models.sim.dmaap.DmaapSimException; import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; @@ -46,6 +48,9 @@ public class DmaapSimCommandLineArguments { private static final int HELP_LINE_LENGTH = 120; private final Options options; + + @Getter + @Setter private String configurationFilePath = null; /** @@ -145,7 +150,7 @@ public class DmaapSimCommandLineArguments { * @throws DmaapSimException on command argument validation errors */ public void validate() throws DmaapSimException { - validateReadableFile("DMaaP simulator configuration", configurationFilePath); + validateFileExists("DMaaP simulator configuration", configurationFilePath); } /** @@ -175,15 +180,6 @@ public class DmaapSimCommandLineArguments { } /** - * Gets the configuration file path. - * - * @return the configuration file path - */ - public String getConfigurationFilePath() { - return configurationFilePath; - } - - /** * Gets the full expanded configuration file path. * * @return the configuration file path @@ -193,16 +189,6 @@ public class DmaapSimCommandLineArguments { } /** - * Sets the configuration file path. - * - * @param configurationFilePath the configuration file path - */ - public void setConfigurationFilePath(final String configurationFilePath) { - this.configurationFilePath = configurationFilePath; - - } - - /** * Check set configuration file path. * * @return true, if check set configuration file path @@ -212,14 +198,14 @@ public class DmaapSimCommandLineArguments { } /** - * Validate readable file. + * Validate file exists. * * @param fileTag the file tag * @param fileName the file name * @throws DmaapSimException on the file name passed as a parameter */ - private void validateReadableFile(final String fileTag, final String fileName) throws DmaapSimException { - if (fileName == null || fileName.length() == 0) { + private void validateFileExists(final String fileTag, final String fileName) throws DmaapSimException { + if (StringUtils.isBlank(fileName)) { throw new DmaapSimException(fileTag + " file was not specified as an argument"); } @@ -233,11 +219,5 @@ public class DmaapSimCommandLineArguments { if (!theFile.exists()) { throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" does not exist"); } - if (!theFile.isFile()) { - throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is not a normal file"); - } - if (!theFile.canRead()) { - throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is ureadable"); - } } } diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java index 878d008a8..7b4f41b07 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +22,6 @@ package org.onap.policy.models.sim.dmaap.startstop; import java.util.Arrays; - -import org.onap.policy.common.utils.services.Registry; -import org.onap.policy.models.sim.dmaap.DmaapSimConstants; import org.onap.policy.models.sim.dmaap.DmaapSimException; import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler; @@ -75,14 +73,12 @@ public class Main { // Now, create the activator for the DMaaP Simulator service activator = new DmaapSimActivator(parameterGroup); - Registry.register(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR, activator); // Start the activator try { activator.start(); } catch (final RuntimeException e) { LOGGER.error("start of DMaaP simulator service failed, used parameters are {}", Arrays.toString(args), e); - Registry.unregister(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR); return; } @@ -110,7 +106,7 @@ public class Main { parameterGroup = null; // clear the DMaaP simulator activator - if (activator != null) { + if (activator != null && activator.isAlive()) { activator.stop(); } } @@ -128,8 +124,9 @@ public class Main { public void run() { try { // Shutdown the DMaaP simulator service and wait for everything to stop - activator.stop(); - } catch (final RuntimeException e) { + shutdown(); + + } catch (final RuntimeException | DmaapSimException e) { LOGGER.warn("error occured during shut down of the DMaaP simulator service", e); } } diff --git a/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json b/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json index dd2477a24..e936eb034 100644 --- a/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json +++ b/models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json @@ -1,5 +1,6 @@ { "name": "DMaapSim", + "topicSweepSec": 900, "restServerParameters": { "host": "0.0.0.0", "port": 3904 |