diff options
author | 2018-08-15 11:46:10 -0400 | |
---|---|---|
committer | 2018-08-16 11:09:15 -0400 | |
commit | 7a2c23b3ad83eab0eed5b990c70a1603447d5ee5 (patch) | |
tree | 24293333fd3cc566c1d77f9c9eedeb034dce9c6c /dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java | |
parent | 9650bd18f6b88721628ebedac2575b44e1b0028e (diff) |
Standalone TCA with EELF Logger
Issue-ID: DCAEGEN2-633
Change-Id: I4da76b532021c0d6248455e7bd6e77f4614c35a7
Signed-off-by: Singla, Rajiv (rs153v) <rs153v@att.com>
Diffstat (limited to 'dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java')
-rw-r--r-- | dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java b/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java new file mode 100644 index 0000000..fe8f7ed --- /dev/null +++ b/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java @@ -0,0 +1,188 @@ +/* + * ================================================================================ + * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ + +package org.onap.dcae.analytics.web.dmaap; + +import static org.apache.commons.text.StringEscapeUtils.unescapeJava; +import static org.apache.commons.text.StringEscapeUtils.unescapeJson; +import static org.onap.dcae.analytics.model.AnalyticsHttpConstants.REQUEST_ID_HEADER_KEY; +import static org.onap.dcae.analytics.model.AnalyticsModelConstants.ANALYTICS_REQUEST_ID_DELIMITER; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.IntStream; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.onap.dcae.analytics.model.DmaapMrConstants; +import org.onap.dcae.analytics.web.exception.AnalyticsParsingException; +import org.onap.dcae.analytics.web.util.AnalyticsHttpUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.integration.splitter.AbstractMessageSplitter; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; + +/** + * DMaaP MR message splitter split the incoming messages into batch of given batch size + * + * @author Rajiv Singla + */ +public class MrMessageSplitter extends AbstractMessageSplitter { + + private static final Logger logger = LoggerFactory.getLogger(MrMessageSplitter.class); + + private final ObjectMapper objectMapper; + private final Integer batchSize; + + public MrMessageSplitter(@Nonnull final ObjectMapper objectMapper, + @Nonnull final Integer batchSize) { + this.objectMapper = objectMapper; + this.batchSize = batchSize; + } + + @Override + protected Object splitMessage(final Message<?> message) { + + final List<String> dmaapMessages = convertJsonToStringMessages(String.class.cast(message.getPayload()).trim()); + + final String requestId = AnalyticsHttpUtils.getRequestId(message.getHeaders()); + final String transactionId = AnalyticsHttpUtils.getTransactionId(message.getHeaders()); + + logger.info("Request Id: {}, Transaction Id: {}, Received new messages from DMaaP MR. Count: {}", + requestId, transactionId, dmaapMessages.size()); + + final List<List<String>> messagePartitions = partition(dmaapMessages, batchSize); + + logger.debug("Request Id: {}, Transaction Id: {}, Max allowed messages per batch: {}. " + + "No of batches created: {}", requestId, transactionId, batchSize, messagePartitions.size()); + + // append batch id to request id header + return messagePartitions.isEmpty() ? null : IntStream.range(0, messagePartitions.size()) + .mapToObj(batchIndex -> + MessageBuilder + .withPayload(messagePartitions.get(0)) + .copyHeaders(message.getHeaders()) + .setHeader(REQUEST_ID_HEADER_KEY, + requestId + ANALYTICS_REQUEST_ID_DELIMITER + batchIndex) + .build() + + ); + } + + /** + * Converts DMaaP MR subscriber messages json string to List of messages. If message Json String is empty + * or null + * + * @param messagesJsonString json messages String + * + * @return List containing DMaaP MR Messages + */ + private List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) { + + final LinkedList<String> messages = new LinkedList<>(); + + // If message string is not null or not empty parse json message array to List of string messages + if (messagesJsonString != null && !messagesJsonString.trim().isEmpty() + && !DmaapMrConstants.SUBSCRIBER_EMPTY_MESSAGE_RESPONSE_STRING.equals(messagesJsonString.trim())) { + + try { + // get root node + final JsonNode rootNode = objectMapper.readTree(messagesJsonString); + // iterate over root node and parse arrays messages + for (JsonNode jsonNode : rootNode) { + // if array parse it is array of messages + final String incomingMessageString = jsonNode.toString(); + if (jsonNode.isArray()) { + final List messageList = objectMapper.readValue(incomingMessageString, List.class); + for (Object message : messageList) { + final String jsonMessageString = objectMapper.writeValueAsString(message); + addUnescapedJsonToMessage(messages, jsonMessageString); + } + } else { + // parse it as object + addUnescapedJsonToMessage(messages, incomingMessageString); + } + } + + } catch (IOException e) { + final String errorMessage = String.format("Unable to convert subscriber Json String to Messages. " + + "Subscriber Response String: %s, Json Error: %s", messagesJsonString, e); + logger.error(errorMessage, e); + throw new AnalyticsParsingException(errorMessage, e); + } + + } + return messages; + } + + /** + * Adds unescaped Json messages to given messages list + * + * @param messages message list in which unescaped messages will be added + * @param incomingMessageString incoming message string that may need to be escaped + */ + private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) { + if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) { + messages.add(unescapeJava(unescapeJson( + incomingMessageString.substring(1, incomingMessageString.length() - 1)))); + } else { + messages.add(unescapeJava(unescapeJson(incomingMessageString))); + } + } + + /** + * Partition list into multiple lists + * + * @param list input list that needs to be broken into chunks + * @param batchSize batch size for each list + * @param <E> element type of the list + * + * @return List containing list of entries of specified batch size + */ + private static <E> List<List<E>> partition(List<E> list, final Integer batchSize) { + + if (list == null || batchSize == null || batchSize <= 0 || list.size() < batchSize) { + return Collections.singletonList(list); + } + + final List<List<E>> result = new LinkedList<>(); + + for (int i = 0; i < list.size(); i++) { + + if (i == 0 || i % batchSize == 0) { + List<E> sublist = new LinkedList<>(); + result.add(sublist); + } + + final List<E> lastSubList = result.get(result.size() - 1); + lastSubList.add(list.get(i)); + + } + return result; + } + + +} |