diff options
Diffstat (limited to 'dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java')
-rw-r--r-- | dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java | 48 |
1 files changed, 29 insertions, 19 deletions
diff --git a/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java b/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java index fe8f7ed..0396005 100644 --- a/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java +++ b/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java @@ -24,11 +24,9 @@ import static org.apache.commons.text.StringEscapeUtils.unescapeJson; import static org.onap.dcae.analytics.model.AnalyticsHttpConstants.REQUEST_ID_HEADER_KEY; import static org.onap.dcae.analytics.model.AnalyticsModelConstants.ANALYTICS_REQUEST_ID_DELIMITER; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.IOException; import java.util.Collections; +import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.stream.IntStream; @@ -36,15 +34,22 @@ import java.util.stream.IntStream; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.onap.dcae.analytics.model.AnalyticsHttpConstants; import org.onap.dcae.analytics.model.DmaapMrConstants; -import org.onap.dcae.analytics.web.exception.AnalyticsParsingException; +import org.onap.dcae.analytics.tca.core.exception.AnalyticsParsingException; +import org.onap.dcae.analytics.tca.core.util.LogSpec; import org.onap.dcae.analytics.web.util.AnalyticsHttpUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.onap.dcae.utils.eelf.logger.api.log.EELFLogFactory; +import org.onap.dcae.utils.eelf.logger.api.log.EELFLogger; +import org.onap.dcae.utils.eelf.logger.api.spec.AuditLogSpec; +import org.onap.dcae.utils.eelf.logger.api.spec.ErrorLogSpec; import org.springframework.integration.splitter.AbstractMessageSplitter; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * DMaaP MR message splitter split the incoming messages into batch of given batch size * @@ -52,7 +57,7 @@ import org.springframework.messaging.Message; */ public class MrMessageSplitter extends AbstractMessageSplitter { - private static final Logger logger = LoggerFactory.getLogger(MrMessageSplitter.class); + private static final EELFLogger eelfLogger = EELFLogFactory.getLogger(MrMessageSplitter.class); private final ObjectMapper objectMapper; private final Integer batchSize; @@ -66,18 +71,23 @@ public class MrMessageSplitter extends AbstractMessageSplitter { @Override protected Object splitMessage(final Message<?> message) { - final List<String> dmaapMessages = convertJsonToStringMessages(String.class.cast(message.getPayload()).trim()); - final String requestId = AnalyticsHttpUtils.getRequestId(message.getHeaders()); + final List<String> dmaapMessages = convertJsonToStringMessages(requestId, String.class.cast(message.getPayload()).trim()); + final String transactionId = AnalyticsHttpUtils.getTransactionId(message.getHeaders()); - logger.info("Request Id: {}, Transaction Id: {}, Received new messages from DMaaP MR. Count: {}", - requestId, transactionId, dmaapMessages.size()); + final Date requestBeginTimestamp = AnalyticsHttpUtils.getTimestampFromHeaders(message.getHeaders(), + AnalyticsHttpConstants.REQUEST_BEGIN_TS_HEADER_KEY); + final AuditLogSpec auditLogSpec = LogSpec.createAuditLogSpec(requestId, requestBeginTimestamp); - final List<List<String>> messagePartitions = partition(dmaapMessages, batchSize); + eelfLogger.auditLog().info("Request Id: {}, Transaction Id: {}, dmaapMessages: {}," + + " Received new messages from DMaaP MR. Count: {}", + auditLogSpec, requestId, transactionId, dmaapMessages.toString(), String.valueOf(dmaapMessages.size())); - logger.debug("Request Id: {}, Transaction Id: {}, Max allowed messages per batch: {}. " + - "No of batches created: {}", requestId, transactionId, batchSize, messagePartitions.size()); + final List<List<String>> messagePartitions = partition(dmaapMessages, batchSize); + eelfLogger.auditLog().info("Request Id: {}, Transaction Id: {}, Max allowed messages per batch: {}. " + + "No of batches created: {}", + auditLogSpec, requestId, transactionId, String.valueOf(batchSize), String.valueOf(messagePartitions.size())); // append batch id to request id header return messagePartitions.isEmpty() ? null : IntStream.range(0, messagePartitions.size()) @@ -100,7 +110,7 @@ public class MrMessageSplitter extends AbstractMessageSplitter { * * @return List containing DMaaP MR Messages */ - private List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) { + private List<String> convertJsonToStringMessages(String requestId, @Nullable final String messagesJsonString) { final LinkedList<String> messages = new LinkedList<>(); @@ -128,9 +138,11 @@ public class MrMessageSplitter extends AbstractMessageSplitter { } } catch (IOException e) { - final String errorMessage = String.format("Unable to convert subscriber Json String to Messages. " + + ErrorLogSpec errorLogSpec = LogSpec.createErrorLogSpec(requestId); + eelfLogger.errorLog().error("Unable to convert subscriber Json String to Messages. " + + "Subscriber Response String: {}, Json Error: {}", errorLogSpec, messagesJsonString, e.toString()); + String errorMessage = String.format("Unable to convert subscriber Json String to Messages. " + "Subscriber Response String: %s, Json Error: %s", messagesJsonString, e); - logger.error(errorMessage, e); throw new AnalyticsParsingException(errorMessage, e); } @@ -183,6 +195,4 @@ public class MrMessageSplitter extends AbstractMessageSplitter { } return result; } - - } |